From fa9d3c59f72ecfad2a085833d068417a5e062aaa Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Wed, 28 Feb 2018 13:46:36 +0100 Subject: [PATCH 1/4] gosimple fixes --- pkg/types/aggregates/invocation.go | 6 +----- pkg/types/helpers.go | 6 ++---- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/pkg/types/aggregates/invocation.go b/pkg/types/aggregates/invocation.go index 91a72b27..d475734d 100644 --- a/pkg/types/aggregates/invocation.go +++ b/pkg/types/aggregates/invocation.go @@ -114,11 +114,7 @@ func (wi *WorkflowInvocation) ApplyEvent(event *fes.Event) error { "event": event, }).Warn("Skipping unimplemented event.") } - if err != nil { - return err - } - - return nil + return err } func (wi *WorkflowInvocation) applyTaskEvent(event *fes.Event) error { diff --git a/pkg/types/helpers.go b/pkg/types/helpers.go index 99b6498d..0f5cff04 100644 --- a/pkg/types/helpers.go +++ b/pkg/types/helpers.go @@ -180,10 +180,8 @@ func SingleDefaultInput(t *TypedValue) map[string]*TypedValue { type Requires map[string]*TaskDependencyParameters func (r Requires) Add(s ...string) Requires { - if s != nil { - for _, v := range s { - r[v] = nil - } + for _, v := range s { + r[v] = nil } return r } From a8c2d46d4ab73ad20ab9ac66a7d7db002f72fffb Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Fri, 18 May 2018 22:35:59 +0200 Subject: [PATCH 2/4] Added initial prometheus metric collection --- .gitignore | 4 + Docs/wip/instrumentation.md | 24 +- INSTALL.md | 4 +- .../templates/deployment.yaml | 3 +- cmd/fission-workflows-bundle/bundle/bundle.go | 108 ++++++--- cmd/fission-workflows-bundle/main.go | 5 + glide.lock | 205 ++++++++++-------- glide.yaml | 2 + pkg/fes/backend/nats/client.go | 37 +++- pkg/fes/backend/nats/nats.go | 11 +- pkg/fes/caches.go | 17 +- pkg/fnenv/native/native.go | 44 +++- 12 files changed, 317 insertions(+), 147 deletions(-) diff --git a/.gitignore b/.gitignore index 12c49761..4c129f1c 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,11 @@ data/ *.tgz /fission-workflows-bundle +/fission-workflows-bundle-osx +/fission-workflows-bundle-windows /wfcli +/wfcli-osx +/wfcli-windows __pycache__/ *.pyc diff --git a/Docs/wip/instrumentation.md b/Docs/wip/instrumentation.md index b2c2f95e..16b60a66 100644 --- a/Docs/wip/instrumentation.md +++ b/Docs/wip/instrumentation.md @@ -3,12 +3,24 @@ This document contains description and high-level documentation on the instrumentation of the system. The instrumentation here is considered to encompass tracing, logging and metrics. -## Logging - -TODO - -Common fields: +Terminology: - ctrl: name of the controller if applicable - wf: id of the workflow if applicable - wfi: id of the workflow invocation if applicable -- component: name of component (controller, api, apiserver, fnenv) \ No newline at end of file +- component: name of component (controller, api, apiserver, fnenv) + +## Logging + +## Metrics + +For metrics the Prometheus time series and monitoring system is used. +The following metrics are collected and available under `:8080/metrics` in the Prometheus data format. + +Metrics: +- Active invocations (Gauge) +- Cache size (Gauge) + +- Completed invocations (Counter) + - Failed invocations (Counter) + - Successful invocations (Counter) + - Aborted invocations (Counter) diff --git a/INSTALL.md b/INSTALL.md index 06497c1f..9ff4e526 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -38,8 +38,8 @@ helm repo update # Install Fission # This assumes that you do not have a Fission deployment yet, and are installing on a standard Minikube deployment. -# Otherwise see http://fission.io/docs/0.4.0/install/ for more detailed instructions -helm install --wait -n fission-all --namespace fission --set serviceType=NodePort --set analytics=false fission-charts/fission-all --version 0.4.1 +# Otherwise see http://fission.io/docs/0.7.2/install/ for more detailed instructions +helm install --wait -n fission-all --namespace fission --set serviceType=NodePort --set analytics=false fission-charts/fission-all --version 0.7.2 # Install Fission Workflows helm install --wait -n fission-workflows fission-charts/fission-workflows --version 0.3.0 diff --git a/charts/fission-workflows/templates/deployment.yaml b/charts/fission-workflows/templates/deployment.yaml index 52c7a672..f45efba2 100644 --- a/charts/fission-workflows/templates/deployment.yaml +++ b/charts/fission-workflows/templates/deployment.yaml @@ -25,6 +25,7 @@ spec: "--api-workflow-invocation", "--api-workflow", "--api-admin", + "--metrics", ] env: - name: ES_NATS_URL @@ -90,4 +91,4 @@ spec: builder: image: "{{ .Values.buildEnvImage }}:{{.Values.tag}}" command: "defaultBuild" - allowedFunctionsPerContainer: infinite + allowedFunctionsPerContainer: infinite \ No newline at end of file diff --git a/cmd/fission-workflows-bundle/bundle/bundle.go b/cmd/fission-workflows-bundle/bundle/bundle.go index a3b445e3..0e9f340b 100644 --- a/cmd/fission-workflows-bundle/bundle/bundle.go +++ b/cmd/fission-workflows-bundle/bundle/bundle.go @@ -13,6 +13,7 @@ import ( wfictr "github.com/fission/fission-workflows/pkg/controller/invocation" wfctr "github.com/fission/fission-workflows/pkg/controller/workflow" "github.com/fission/fission-workflows/pkg/fes" + "github.com/fission/fission-workflows/pkg/fes/backend/mem" "github.com/fission/fission-workflows/pkg/fes/backend/nats" "github.com/fission/fission-workflows/pkg/fnenv" "github.com/fission/fission-workflows/pkg/fnenv/fission" @@ -29,6 +30,7 @@ import ( executor "github.com/fission/fission/executor/client" "github.com/gorilla/handlers" grpcruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" "google.golang.org/grpc" ) @@ -49,6 +51,7 @@ type Options struct { WorkflowAPI bool HTTPGateway bool InvocationAPI bool + Metrics bool } type FissionOptions struct { @@ -77,6 +80,11 @@ func Run(ctx context.Context, opts *Options) error { natsEs := setupNatsEventStoreClient(opts.Nats.URL, opts.Nats.Cluster, opts.Nats.Client) es = natsEs esPub = natsEs + } else { + log.Warn("No event store provided; using the development, in-memory event store") + backend := mem.NewBackend() + es = backend + esPub = backend } // Caches @@ -88,9 +96,13 @@ func Run(ctx context.Context, opts *Options) error { resolvers := map[string]fnenv.RuntimeResolver{} runtimes := map[string]fnenv.Runtime{} - log.Infof("Using Task Runtime: Workflow") - reflectiveRuntime := workflows.NewRuntime(invocationAPI, wfiCache()) - runtimes[workflows.Name] = reflectiveRuntime + if opts.InternalRuntime || opts.Fission != nil { + log.Infof("Using Task Runtime: Workflow") + reflectiveRuntime := workflows.NewRuntime(invocationAPI, wfiCache()) + runtimes[workflows.Name] = reflectiveRuntime + } else { + log.Info("No function runtimes specified.") + } if opts.InternalRuntime { log.Infof("Using Task Runtime: Internal") runtimes["internal"] = setupInternalFunctionRuntime() @@ -125,9 +137,18 @@ func Run(ctx context.Context, opts *Options) error { // Http servers if opts.Fission != nil { + proxyMux := http.NewServeMux() + runFissionEnvironmentProxy(proxyMux, es, wfiCache(), wfCache(), resolvers) proxySrv := &http.Server{Addr: fissionProxyAddress} + proxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux) + + if opts.Metrics { + setupMetricsEndpoint(proxyMux) + } + + go proxySrv.ListenAndServe() defer proxySrv.Shutdown(ctx) - runFissionEnvironmentProxy(proxySrv, es, wfiCache(), wfCache(), resolvers) + log.Info("Serving HTTP Fission Proxy at: ", proxySrv.Addr) } if opts.AdminAPI { @@ -152,20 +173,41 @@ func Run(ctx context.Context, opts *Options) error { go grpcServer.Serve(lis) } - if opts.HTTPGateway { - HTTPGatewaySrv := &http.Server{Addr: apiGatewayAddress} - defer HTTPGatewaySrv.Shutdown(ctx) - var admin, wf, wfi string - if opts.AdminAPI { - admin = gRPCAddress + if opts.HTTPGateway || opts.Metrics { + grpcMux := grpcruntime.NewServeMux() + httpMux := http.NewServeMux() + + if opts.HTTPGateway { + + var admin, wf, wfi string + if opts.AdminAPI { + admin = gRPCAddress + } + if opts.WorkflowAPI { + wf = gRPCAddress + } + if opts.InvocationAPI { + wfi = gRPCAddress + } + serveHTTPGateway(ctx, grpcMux, admin, wf, wfi) } - if opts.WorkflowAPI { - wf = gRPCAddress - } - if opts.InvocationAPI { - wfi = gRPCAddress + + // Metrics + if opts.Metrics { + setupMetricsEndpoint(httpMux) + log.Infof("Set up prometheus collector: %v/metrics", apiGatewayAddress) } - serveHTTPGateway(ctx, HTTPGatewaySrv, admin, wf, wfi) + + apiSrv := &http.Server{Addr: apiGatewayAddress} + httpMux.Handle("/", grpcMux) + apiSrv.Handler = httpMux + go func() { + err := apiSrv.ListenAndServe() + log.WithField("err", err).Info("HTTP Gateway exited") + }() + defer apiSrv.Shutdown(ctx) + + log.Info("Serving HTTP API gateway at: ", apiSrv.Addr) } log.Info("Bundle set up.") @@ -285,40 +327,35 @@ func serveInvocationAPI(s *grpc.Server, es fes.Backend, wfiCache fes.CacheReader log.Infof("Serving workflow invocation gRPC API at %s.", gRPCAddress) } -func serveHTTPGateway(ctx context.Context, gwSrv *http.Server, adminAPIAddr string, workflowAPIAddr string, invocationAPIAddr string) { - mux := grpcruntime.NewServeMux() - grpcOpts := []grpc.DialOption{grpc.WithInsecure()} +func serveHTTPGateway(ctx context.Context, mux *grpcruntime.ServeMux, adminAPIAddr string, workflowAPIAddr string, + invocationAPIAddr string) { + opts := []grpc.DialOption{grpc.WithInsecure()} if adminAPIAddr != "" { - err := apiserver.RegisterAdminAPIHandlerFromEndpoint(ctx, mux, adminAPIAddr, grpcOpts) + err := apiserver.RegisterAdminAPIHandlerFromEndpoint(ctx, mux, adminAPIAddr, opts) if err != nil { panic(err) } + log.Info("Registered Workflow API HTTP Endpoint") } if workflowAPIAddr != "" { - err := apiserver.RegisterWorkflowAPIHandlerFromEndpoint(ctx, mux, workflowAPIAddr, grpcOpts) + err := apiserver.RegisterWorkflowAPIHandlerFromEndpoint(ctx, mux, workflowAPIAddr, opts) if err != nil { panic(err) } + log.Info("Registered Admin API HTTP Endpoint") } if invocationAPIAddr != "" { - err := apiserver.RegisterWorkflowInvocationAPIHandlerFromEndpoint(ctx, mux, invocationAPIAddr, grpcOpts) + err := apiserver.RegisterWorkflowInvocationAPIHandlerFromEndpoint(ctx, mux, invocationAPIAddr, opts) if err != nil { panic(err) } + log.Info("Registered Workflow Invocation API HTTP Endpoint") } - - gwSrv.Handler = mux - go func() { - err := gwSrv.ListenAndServe() - log.WithField("err", err).Info("HTTP Gateway exited") - }() - - log.Info("Serving HTTP API gateway at: ", gwSrv.Addr) } -func runFissionEnvironmentProxy(proxySrv *http.Server, es fes.Backend, wfiCache fes.CacheReader, +func runFissionEnvironmentProxy(proxyMux *http.ServeMux, es fes.Backend, wfiCache fes.CacheReader, wfCache fes.CacheReader, resolvers map[string]fnenv.RuntimeResolver) { workflowParser := fnenv.NewMetaResolver(resolvers) @@ -326,13 +363,8 @@ func runFissionEnvironmentProxy(proxySrv *http.Server, es fes.Backend, wfiCache wfServer := apiserver.NewWorkflow(workflowAPI, wfCache) wfiAPI := api.NewInvocationAPI(es) wfiServer := apiserver.NewInvocation(wfiAPI, wfiCache) - proxyMux := http.NewServeMux() fissionProxyServer := fission.NewFissionProxyServer(wfiServer, wfServer) fissionProxyServer.RegisterServer(proxyMux) - - proxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux) - go proxySrv.ListenAndServe() - log.Info("Serving HTTP Fission Proxy at: ", proxySrv.Addr) } func setupInvocationController(invocationCache fes.CacheReader, wfCache fes.CacheReader, es fes.Backend, @@ -356,3 +388,7 @@ func runController(ctx context.Context, ctrls ...controller.Controller) { ctrl := controller.NewMetaController(ctrls...) go ctrl.Run(ctx) } + +func setupMetricsEndpoint(apiMux *http.ServeMux) { + apiMux.Handle("/metrics", promhttp.Handler()) +} diff --git a/cmd/fission-workflows-bundle/main.go b/cmd/fission-workflows-bundle/main.go index 506011c3..0350ccad 100644 --- a/cmd/fission-workflows-bundle/main.go +++ b/cmd/fission-workflows-bundle/main.go @@ -30,6 +30,7 @@ func main() { WorkflowAPI: c.Bool("api") || c.Bool("api-workflow"), InvocationAPI: c.Bool("api") || c.Bool("api-workflow-invocation"), HTTPGateway: c.Bool("api") || c.Bool("api-http"), + Metrics: c.Bool("metrics") || c.Bool("metrics"), }) } cliApp.Run(os.Args) @@ -163,6 +164,10 @@ func createCli() *cli.App { Name: "api-admin", Usage: "Serve the admin gRPC api", }, + cli.BoolFlag{ + Name: "metrics", + Usage: "Serve prometheus metrics", + }, } return cliApp diff --git a/glide.lock b/glide.lock index c817ad75..c3995245 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: a8e090c9a44f14974784e926e6cf1e51f05963db807d22cf4278cf21763e6146 -updated: 2018-06-02T13:28:44.604586+02:00 +hash: 8dbde2ffd1ec9184f56449c67b2dfcd9ec2c77dac803c05fa9f26fb052be4340 +updated: 2018-06-06T09:01:22.179086+02:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -13,36 +13,30 @@ imports: - autorest/adal - autorest/azure - autorest/date +- name: github.com/beorn7/perks + version: 3a771d992973f24aa725d07868b467d1ddfceafb + subpackages: + - quantile - name: github.com/davecgh/go-spew version: 346938d642f2ec3594ed81d874461961cd0faa76 subpackages: - spew - name: github.com/dgrijalva/jwt-go version: 01aeca54ebda6e0fbfafd0a524d234159c05ec20 -- name: github.com/docker/distribution - version: cd27f179f2c10c5d300e6d09025b538c475b0d51 - subpackages: - - digest - - reference - name: github.com/docker/spdystream version: 449fdfce4d962303d702fec724ef0ad181c92528 subpackages: - spdy -- name: github.com/emicklei/go-restful - version: ff4f55a206334ef123e4f79bbf348980da81ca46 - subpackages: - - log -- name: github.com/emicklei/go-restful-swagger12 - version: dcef7f55730566d41eae5db10e7d6981829720f6 - name: github.com/fatih/structs version: a720dfa8df582c51dee1b36feabb906bde1588bd - name: github.com/fission/fission - version: 6a3e326f36164e589e14c4898e5c097457c9db8d + version: b782df4d5a69b0db21992bafc365456c880516f8 subpackages: - cache - controller/client - crd - executor/client + - pkg/apis/fission.io/v1 - router - name: github.com/ghodss/yaml version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee @@ -50,10 +44,6 @@ imports: version: 7222828b8ce19afee3c595aef6643b9e42150120 - name: github.com/go-openapi/errors version: 49fe8b3a0e0d32a617d8d50c67f856ad6e45b28b -- name: github.com/go-openapi/jsonpointer - version: 46af16f9f7b149af66e5d1bd010e3574dc06de98 -- name: github.com/go-openapi/jsonreference - version: 13c6e3589ad90f49bd3e3bbe2c2cb3d7a4142272 - name: github.com/go-openapi/loads version: 315567415dfd74b651f7a62cabfc82a57ed7b9ad - name: github.com/go-openapi/spec @@ -86,6 +76,21 @@ imports: - ptypes/wrappers - name: github.com/google/gofuzz version: 44d81051d367757e1c7c6a5a86423ece9afcf63c +- name: github.com/googleapis/gnostic + version: 0c5108395e2debce0d731cf0287ddf7242066aba + subpackages: + - OpenAPIv2 + - compiler + - extensions +- name: github.com/gophercloud/gophercloud + version: 6da026c32e2d622cc242d32984259c77237aefe1 + subpackages: + - openstack + - openstack/identity/v2/tenants + - openstack/identity/v2/tokens + - openstack/identity/v3/tokens + - openstack/utils + - pagination - name: github.com/gorilla/context version: 08b5f424b9271eedf6f9f0ce86cb9396ed337a42 - name: github.com/gorilla/handlers @@ -110,21 +115,19 @@ imports: version: bf9dde6d0d2c004a008c27aaee91170c786f6db8 - name: github.com/imdario/mergo version: 9d5f1277e9a8ed20c3684bda8fde67c05628518c -- name: github.com/juju/ratelimit - version: 5b9ff866471762aa2ab2dced63c9fb6f53921342 -- name: github.com/mailru/easyjson - version: d5b7844b561a7bc640052f1b935f7b800330d7e0 - subpackages: - - buffer - - jlexer - - jwriter +- name: github.com/json-iterator/go + version: 13f86432b882000a51c6e610c620974462691a97 +- name: github.com/matttproud/golang_protobuf_extensions + version: c12348ce28de40eed0136aa2b644d0ee0650e56c + subpackages: + - pbutil - name: github.com/nats-io/go-nats version: d66cb54e6b7bdd93f0b28afc8450d84c780dfb68 subpackages: - encoders/builtin - util - name: github.com/nats-io/go-nats-streaming - version: 6e620057a207bd61e992c1c5b6a2de7b6a4cb010 + version: e15a53f85e4932540600a16b56f6c4f65f58176f subpackages: - pb - name: github.com/nats-io/nats-streaming-server @@ -136,10 +139,25 @@ imports: version: 289cccf02c178dc782430d534e3c1f5b72af807f - name: github.com/pkg/errors version: 645ef00459ed84a119197bfb8d8205042c6df63d -- name: github.com/PuerkitoBio/purell - version: 8a290539e2e8629dbc4e6bad948158f790ec31f4 -- name: github.com/PuerkitoBio/urlesc - version: 5bd2802263f21d8788851d5305584c82a5c75d7e +- name: github.com/prometheus/client_golang + version: c5b7fccd204277076155f10851dad72b76a49317 + subpackages: + - prometheus + - prometheus/promhttp +- name: github.com/prometheus/client_model + version: 99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c + subpackages: + - go +- name: github.com/prometheus/common + version: 61f87aac8082fa8c3c5655c7608d7478d46ac2ad + subpackages: + - expfmt + - internal/bitbucket.org/ww/goautoneg + - model +- name: github.com/prometheus/procfs + version: e645f4e5aaa8506fc71d6edbc5c4ff02c04c46f2 + subpackages: + - xfs - name: github.com/robertkrimen/otto version: 6c383dd335ef8dcccef05e651ce1eccfe4d0f011 subpackages: @@ -157,11 +175,7 @@ imports: - name: github.com/sirupsen/logrus version: c155da19408a8799da419ed3eeb0cb5db0ad5dbc - name: github.com/spf13/pflag - version: 9ff6c6923cfffbcd502984b8e0c80539a94968b7 -- name: github.com/ugorji/go - version: ded73eae5db7e7a0ef6f55aace87a2873c5d2b74 - subpackages: - - codec + version: 583c0c0531f06d5278b7d917446061adc344b5cd - name: github.com/urfave/cli version: 0bdeddeeb0f650497d603c4ad7b20cfe685682f6 - name: golang.org/x/crypto @@ -198,17 +212,14 @@ imports: - name: golang.org/x/text version: 88f656faf3f37f690df1a32515b479415e1a6769 subpackages: - - cases - - internal - - internal/tag - - language - - runes - secure/bidirule - - secure/precis - transform - unicode/bidi - unicode/norm - - width +- name: golang.org/x/time + version: f51c12702a4d776e4c1fa9b0fabab841babae631 + subpackages: + - rate - name: gonum.org/v1/gonum version: 996b88e8f8941c9defdbb338bea5e239a230e742 repo: http://github.com/gonum/gonum @@ -235,9 +246,7 @@ imports: - mat - name: google.golang.org/appengine version: 9d8544a6b2c7df9cff240fcf92d7b2f59bc13416 - repo: https://github.com/golang/appengine subpackages: - - cloudsql - internal - internal/app_identity - internal/base @@ -285,8 +294,39 @@ imports: - base64vlq - name: gopkg.in/yaml.v2 version: 53feefa2559fb8dfa8d81baad31be332c97d6c77 +- name: k8s.io/api + version: 8b7507fac302640dd5f1efbf9643199952cc58db + subpackages: + - admissionregistration/v1alpha1 + - admissionregistration/v1beta1 + - apps/v1 + - apps/v1beta1 + - apps/v1beta2 + - authentication/v1 + - authentication/v1beta1 + - authorization/v1 + - authorization/v1beta1 + - autoscaling/v1 + - autoscaling/v2beta1 + - batch/v1 + - batch/v1beta1 + - batch/v2alpha1 + - certificates/v1beta1 + - core/v1 + - events/v1beta1 + - extensions/v1beta1 + - networking/v1 + - policy/v1beta1 + - rbac/v1 + - rbac/v1alpha1 + - rbac/v1beta1 + - scheduling/v1alpha1 + - settings/v1alpha1 + - storage/v1 + - storage/v1alpha1 + - storage/v1beta1 - name: k8s.io/apiextensions-apiserver - version: 718d845dac8d60d16967cd230c1ea21265cc7f85 + version: 8e7f43002fec5394a8d96ebca781aa9d4b37aaef subpackages: - pkg/apis/apiextensions - pkg/apis/apiextensions/v1beta1 @@ -294,24 +334,19 @@ imports: - pkg/client/clientset/clientset/scheme - pkg/client/clientset/clientset/typed/apiextensions/v1beta1 - name: k8s.io/apimachinery - version: 1cb2cdd78d38df243e686d1b572b76e190469842 + version: 17529ec7eadb8de8e7dc835201455f53571f655a subpackages: - - pkg/api/equality - pkg/api/errors - pkg/api/meta - pkg/api/resource - - pkg/apimachinery - - pkg/apimachinery/announced - - pkg/apimachinery/registered + - pkg/apis/meta/internalversion - pkg/apis/meta/v1 - pkg/apis/meta/v1/unstructured - - pkg/apis/meta/v1alpha1 + - pkg/apis/meta/v1beta1 - pkg/conversion - pkg/conversion/queryparams - - pkg/conversion/unstructured - pkg/fields - pkg/labels - - pkg/openapi - pkg/runtime - pkg/runtime/schema - pkg/runtime/serializer @@ -332,7 +367,6 @@ imports: - pkg/util/intstr - pkg/util/json - pkg/util/net - - pkg/util/rand - pkg/util/remotecommand - pkg/util/runtime - pkg/util/sets @@ -345,75 +379,48 @@ imports: - third_party/forked/golang/netutil - third_party/forked/golang/reflect - name: k8s.io/client-go - version: d92e8497f71b7b4e0494e5bd204b48d34bd6f254 + version: 23781f4d6632d88e869066eaebb743857aa1ef9b subpackages: - discovery - kubernetes - kubernetes/scheme - kubernetes/typed/admissionregistration/v1alpha1 + - kubernetes/typed/admissionregistration/v1beta1 + - kubernetes/typed/apps/v1 - kubernetes/typed/apps/v1beta1 + - kubernetes/typed/apps/v1beta2 - kubernetes/typed/authentication/v1 - kubernetes/typed/authentication/v1beta1 - kubernetes/typed/authorization/v1 - kubernetes/typed/authorization/v1beta1 - kubernetes/typed/autoscaling/v1 - - kubernetes/typed/autoscaling/v2alpha1 + - kubernetes/typed/autoscaling/v2beta1 - kubernetes/typed/batch/v1 + - kubernetes/typed/batch/v1beta1 - kubernetes/typed/batch/v2alpha1 - kubernetes/typed/certificates/v1beta1 - kubernetes/typed/core/v1 + - kubernetes/typed/events/v1beta1 - kubernetes/typed/extensions/v1beta1 - kubernetes/typed/networking/v1 - kubernetes/typed/policy/v1beta1 + - kubernetes/typed/rbac/v1 - kubernetes/typed/rbac/v1alpha1 - kubernetes/typed/rbac/v1beta1 + - kubernetes/typed/scheduling/v1alpha1 - kubernetes/typed/settings/v1alpha1 - kubernetes/typed/storage/v1 + - kubernetes/typed/storage/v1alpha1 - kubernetes/typed/storage/v1beta1 - - pkg/api - - pkg/api/v1 - - pkg/api/v1/ref - - pkg/apis/admissionregistration - - pkg/apis/admissionregistration/v1alpha1 - - pkg/apis/apps - - pkg/apis/apps/v1beta1 - - pkg/apis/authentication - - pkg/apis/authentication/v1 - - pkg/apis/authentication/v1beta1 - - pkg/apis/authorization - - pkg/apis/authorization/v1 - - pkg/apis/authorization/v1beta1 - - pkg/apis/autoscaling - - pkg/apis/autoscaling/v1 - - pkg/apis/autoscaling/v2alpha1 - - pkg/apis/batch - - pkg/apis/batch/v1 - - pkg/apis/batch/v2alpha1 - - pkg/apis/certificates - - pkg/apis/certificates/v1beta1 - - pkg/apis/extensions - - pkg/apis/extensions/v1beta1 - - pkg/apis/networking - - pkg/apis/networking/v1 - - pkg/apis/policy - - pkg/apis/policy/v1beta1 - - pkg/apis/rbac - - pkg/apis/rbac/v1alpha1 - - pkg/apis/rbac/v1beta1 - - pkg/apis/settings - - pkg/apis/settings/v1alpha1 - - pkg/apis/storage - - pkg/apis/storage/v1 - - pkg/apis/storage/v1beta1 - - pkg/labels - - pkg/util - - pkg/util/intstr - - pkg/util/parsers + - pkg/apis/clientauthentication + - pkg/apis/clientauthentication/v1alpha1 - pkg/version - plugin/pkg/client/auth - plugin/pkg/client/auth/azure + - plugin/pkg/client/auth/exec - plugin/pkg/client/auth/gcp - plugin/pkg/client/auth/oidc + - plugin/pkg/client/auth/openstack - rest - rest/watch - third_party/forked/golang/template @@ -424,15 +431,21 @@ imports: - tools/clientcmd/api/latest - tools/clientcmd/api/v1 - tools/metrics + - tools/pager - tools/portforward + - tools/reference - tools/remotecommand - transport + - transport/spdy + - util/buffer - util/cert - util/exec - util/flowcontrol - util/homedir - util/integer + - util/intstr - util/jsonpath + - util/retry testImports: - name: github.com/pmezard/go-difflib version: d8ed2627bdf02c080bf22230dbb337003b7aba2d diff --git a/glide.yaml b/glide.yaml index c9e18d2f..d49e5034 100644 --- a/glide.yaml +++ b/glide.yaml @@ -61,6 +61,8 @@ import: version: v1.0 - package: github.com/pkg/errors version: ^0.8.0 +- package: github.com/prometheus/client_golang + version: 0.8.0 testImport: - package: github.com/stretchr/testify version: 1.1.4 diff --git a/pkg/fes/backend/nats/client.go b/pkg/fes/backend/nats/client.go index 62de63da..feef81fd 100644 --- a/pkg/fes/backend/nats/client.go +++ b/pkg/fes/backend/nats/client.go @@ -9,6 +9,7 @@ import ( "github.com/golang/protobuf/proto" nats "github.com/nats-io/go-nats" "github.com/nats-io/go-nats-streaming" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -16,6 +17,27 @@ const ( defaultClient = "fes" ) +var ( + subsActive = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "fes", + Subsystem: "nats", + Name: "subs_active", + Help: "Number of active subscriptions to NATS subjects.", + }) + + eventsAppended = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "fes", + Subsystem: "nats", + Name: "events_appended_total", + Help: "Count of appended events (excluding any internal events).", + }) +) + +func init() { + prometheus.MustRegister(subsActive) + prometheus.MustRegister(eventsAppended) +} + type EventStore struct { pubsub.Publisher conn *WildcardConn @@ -92,11 +114,17 @@ func (es *EventStore) Watch(aggregate fes.Aggregate) error { logrus.Infof("Backend client watches:' %s'", subject) es.sub[aggregate] = sub + subsActive.Inc() return nil } func (es *EventStore) Close() error { - return es.conn.Close() + err := es.conn.Close() + if err != nil { + return err + } + subsActive.Dec() + return nil } func (es *EventStore) Append(event *fes.Event) error { @@ -116,7 +144,12 @@ func (es *EventStore) Append(event *fes.Event) error { "nats.subject": subject, }).Infof("Appending event: %v", event.Type) - return es.conn.Publish(subject, data) + err = es.conn.Publish(subject, data) + if err != nil { + return err + } + eventsAppended.Inc() + return nil } func (es *EventStore) Get(aggregate *fes.Aggregate) ([]*fes.Event, error) { diff --git a/pkg/fes/backend/nats/nats.go b/pkg/fes/backend/nats/nats.go index e2ccf6ba..060c4a52 100644 --- a/pkg/fes/backend/nats/nats.go +++ b/pkg/fes/backend/nats/nats.go @@ -113,7 +113,11 @@ func (cn *Conn) MsgSeqRange(subject string, seqStart uint64, seqEnd uint64) ([]* if err != nil { return nil, err } - defer sub.Close() + subsActive.Inc() + defer func() { + sub.Close() + subsActive.Dec() + }() for { select { @@ -138,6 +142,7 @@ func NewWildcardConn(conn stan.Conn) *WildcardConn { func (wc *WildcardConn) Subscribe(wildcardSubject string, cb stan.MsgHandler, opts ...stan.SubscriptionOption) (stan.Subscription, error) { if !hasWildcard(wildcardSubject) { + subsActive.Inc() return wc.Conn.Subscribe(wildcardSubject, cb, opts...) } @@ -183,10 +188,11 @@ func (wc *WildcardConn) Subscribe(wildcardSubject string, cb stan.MsgHandler, op case deleted: // Delete the current listener of the subject of the event if _, ok := ws.sources[subject]; ok { - err := ws.sources[subject].Close() + err := ws.sources[subject].Unsubscribe() if err != nil { logrus.Errorf("Failed to close (sub)listener: %v", err) } + subsActive.Dec() } default: panic(fmt.Sprintf("Unknown eventType: %v", subjectEvent)) @@ -287,6 +293,7 @@ func (ws *WildcardSub) Unsubscribe() error { err := ws.activitySub.Unsubscribe() for id, source := range ws.sources { err = source.Unsubscribe() + subsActive.Dec() delete(ws.sources, id) } return err diff --git a/pkg/fes/caches.go b/pkg/fes/caches.go index caadcfbf..a0ef9101 100644 --- a/pkg/fes/caches.go +++ b/pkg/fes/caches.go @@ -8,6 +8,7 @@ import ( "github.com/fission/fission-workflows/pkg/util/pubsub" "github.com/golang/protobuf/ptypes" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -20,8 +21,21 @@ const ( var ( ErrNotFound = errors.New("could not find entity") + + cacheCount = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "fes", + Subsystem: "cache", + Name: "current_cache_counts", + Help: "The current number of entries in the caches", + }) + + // TODO add metrics: cache size, access latencies, update latencies (from event creation -> notification) ) +func init() { + prometheus.MustRegister(cacheCount) +} + // MapCache provides a simple non-preempting map-based CacheReaderWriter implementation. type MapCache struct { contents map[string]map[string]Aggregator // Map: AggregateType -> AggregateId -> entity @@ -93,8 +107,8 @@ func (rc *MapCache) Put(entity Aggregator) error { if _, ok := rc.contents[ref.Type]; !ok { rc.contents[ref.Type] = map[string]Aggregator{} } - rc.contents[ref.Type][ref.Id] = entity + cacheCount.Inc() return nil } @@ -102,6 +116,7 @@ func (rc *MapCache) Invalidate(ref *Aggregate) { rc.lock.Lock() defer rc.lock.Unlock() delete(rc.contents[ref.Type], ref.Id) + cacheCount.Dec() } func (rc *MapCache) List() []Aggregate { diff --git a/pkg/fnenv/native/native.go b/pkg/fnenv/native/native.go index 73ecef41..1ccf1331 100644 --- a/pkg/fnenv/native/native.go +++ b/pkg/fnenv/native/native.go @@ -4,10 +4,13 @@ package native import ( "fmt" "runtime/debug" + "time" "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/validate" "github.com/golang/protobuf/ptypes" + + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" ) @@ -15,6 +18,40 @@ const ( Name = "native" ) +var ( + fnActive = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "fnenv", + Subsystem: "native", + Name: "functions_active", + Help: "Number of function executions that are currently active", + }) + + fnCount = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "fnenv", + Subsystem: "native", + Name: "functions_execution_total", + Help: "Total number of function executions", + }) + + fnResolved = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "fnenv", + Subsystem: "native", + Name: "functions_resolved_total", + Help: "Total number of function resolved", + }) + + fnExecTime = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "fnenv", + Subsystem: "native", + Name: "function_execution_time_milliseconds", + Help: "Execution time summary of the internal functions", + }) +) + +func init() { + prometheus.MustRegister(fnActive, fnResolved, fnExecTime, fnCount) +} + // An InternalFunction is a function that will be executed in the same process as the invoker. type InternalFunction interface { Invoke(spec *types.TaskInvocationSpec) (*types.TypedValue, error) @@ -49,13 +86,17 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca return nil, err } + timeStart := time.Now() + defer fnExecTime.Observe(float64(time.Since(timeStart))) fnID := spec.FnRef.ID fn, ok := fe.fns[fnID] if !ok { return nil, fmt.Errorf("could not resolve internal function '%s'", fnID) } - + fnActive.Inc() out, err := fn.Invoke(spec) + fnActive.Dec() + fnCount.Inc() if err != nil { log.WithFields(log.Fields{ "fnID": fnID, @@ -78,6 +119,7 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca } func (fe *FunctionEnv) Resolve(fnName string) (string, error) { + fnResolved.Inc() _, ok := fe.fns[fnName] if !ok { return "", fmt.Errorf("could not resolve internal function '%s'", fnName) From 4395dfaaa02ef80aa7d841f6b7c7857b04d0eb83 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Thu, 7 Jun 2018 13:31:59 +0200 Subject: [PATCH 3/4] Add initial instrumentation --- build/runtime-env/Dockerfile | 4 +- cmd/fission-workflows-bundle/bundle/bundle.go | 13 +++- cmd/wfcli/portforward.go | 10 +-- glide.lock | 11 ++- glide.yaml | 4 +- pkg/apiserver/invocation.go | 44 +---------- pkg/controller/controller.go | 34 +++++++++ pkg/controller/invocation/actions.go | 44 +++++++---- pkg/controller/invocation/controller.go | 58 ++++++++++++++- pkg/controller/workflow/controller.go | 73 ++++++++++++------- pkg/fes/backend/mem/mem.go | 9 +++ pkg/fes/backend/nats/client.go | 50 ++++++++----- pkg/fes/backend/nats/nats.go | 9 +-- pkg/fes/caches.go | 23 ++++-- pkg/fnenv/fission/envproxy.go | 1 + pkg/fnenv/fission/resolver.go | 1 - pkg/fnenv/fission/runtime.go | 10 +++ pkg/fnenv/fission/timed.go | 1 + pkg/fnenv/fnenv.go | 26 +++++++ pkg/fnenv/native/native.go | 45 ++---------- pkg/fnenv/resolver.go | 16 ++++ pkg/fnenv/workflows/workflows.go | 18 ++++- pkg/scheduler/scheduler.go | 26 +++++++ pkg/types/extensions.go | 4 + 24 files changed, 361 insertions(+), 173 deletions(-) diff --git a/build/runtime-env/Dockerfile b/build/runtime-env/Dockerfile index dc565581..097a7ed1 100644 --- a/build/runtime-env/Dockerfile +++ b/build/runtime-env/Dockerfile @@ -10,6 +10,7 @@ FROM scratch COPY --from=workflows-bundle /fission-workflows-bundle /fission-workflows-bundle EXPOSE 8888 +EXPOSE 8080 ENV FNENV_FISSION_CONTROLLER http://controller.fission ENV FNENV_FISSION_EXECUTOR http://executor.fission @@ -25,4 +26,5 @@ ENTRYPOINT ["/fission-workflows-bundle", \ "--api-http", \ "--api-workflow-invocation", \ "--api-workflow", \ - "--api-admin"] + "--api-admin", \ + "--metrics"] \ No newline at end of file diff --git a/cmd/fission-workflows-bundle/bundle/bundle.go b/cmd/fission-workflows-bundle/bundle/bundle.go index 0e9f340b..9f840443 100644 --- a/cmd/fission-workflows-bundle/bundle/bundle.go +++ b/cmd/fission-workflows-bundle/bundle/bundle.go @@ -29,6 +29,7 @@ import ( controllerc "github.com/fission/fission/controller/client" executor "github.com/fission/fission/executor/client" "github.com/gorilla/handlers" + "github.com/grpc-ecosystem/go-grpc-prometheus" grpcruntime "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" @@ -67,7 +68,10 @@ func Run(ctx context.Context, opts *Options) error { var es fes.Backend var esPub pubsub.Publisher - grpcServer := grpc.NewServer() + grpcServer := grpc.NewServer( + grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), + grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), + ) defer grpcServer.GracefulStop() // Event Stores @@ -164,6 +168,9 @@ func Run(ctx context.Context, opts *Options) error { } if opts.AdminAPI || opts.WorkflowAPI || opts.InvocationAPI { + log.Info("Instrumenting gRPC server with Prometheus metrics") + grpc_prometheus.Register(grpcServer) + lis, err := net.Listen("tcp", gRPCAddress) if err != nil { log.Fatalf("failed to listen: %v", err) @@ -291,7 +298,7 @@ func setupWorkflowInvocationCache(ctx context.Context, invocationEventPub pubsub return aggregates.NewWorkflowInvocation("") } - return fes.NewSubscribedCache(ctx, fes.NewMapCache(), wi, invokeSub) + return fes.NewSubscribedCache(ctx, fes.NewNamedMapCache("invocation"), wi, invokeSub) } func setupWorkflowCache(ctx context.Context, workflowEventPub pubsub.Publisher) *fes.SubscribedCache { @@ -302,7 +309,7 @@ func setupWorkflowCache(ctx context.Context, workflowEventPub pubsub.Publisher) wb := func() fes.Aggregator { return aggregates.NewWorkflow("") } - return fes.NewSubscribedCache(ctx, fes.NewMapCache(), wb, wfSub) + return fes.NewSubscribedCache(ctx, fes.NewNamedMapCache("workflow"), wb, wfSub) } func serveAdminAPI(s *grpc.Server) { diff --git a/cmd/wfcli/portforward.go b/cmd/wfcli/portforward.go index d16303d2..2ed9e740 100644 --- a/cmd/wfcli/portforward.go +++ b/cmd/wfcli/portforward.go @@ -18,7 +18,7 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/portforward" - "k8s.io/client-go/tools/remotecommand" + "k8s.io/client-go/transport/spdy" ) type client struct { @@ -118,8 +118,7 @@ func runPortForward(kubeConfig string, labelSelector string, localPort string, f } // get the pod; if there is more than one, ask the user to disambiguate - podList, err := clientset.CoreV1().Pods(fissionNamespace). - List(meta_v1.ListOptions{LabelSelector: labelSelector}) + podList, err := clientset.CoreV1().Pods(fissionNamespace).List(meta_v1.ListOptions{LabelSelector: labelSelector}) if err != nil || len(podList.Items) == 0 { panic("Error getting controller pod for port-forwarding") } @@ -158,7 +157,7 @@ func runPortForward(kubeConfig string, labelSelector string, localPort string, f readyChannel := make(chan struct{}) // create request URL - req := clientset.CoreV1Client.RESTClient().Post().Resource("pods"). + req := clientset.CoreV1().RESTClient().Post().Resource("pods"). Namespace(podNameSpace).Name(podName).SubResource("portforward") url := req.URL() @@ -167,11 +166,12 @@ func runPortForward(kubeConfig string, labelSelector string, localPort string, f ports := []string{portCombo} // actually start the port-forwarding process here - dialer, err := remotecommand.NewExecutor(config, "POST", url) + transport, upgrader, err := spdy.RoundTripperFor(config) if err != nil { msg := fmt.Sprintf("newexecutor errored out :%v", err.Error()) panic(msg) } + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) fw, err := portforward.New(dialer, ports, stopChannel, readyChannel, nil, os.Stderr) if err != nil { diff --git a/glide.lock b/glide.lock index c3995245..e017f6e9 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 8dbde2ffd1ec9184f56449c67b2dfcd9ec2c77dac803c05fa9f26fb052be4340 -updated: 2018-06-06T09:01:22.179086+02:00 +hash: ba75f2d0e6fb6979dd369812eac5b97d27a8cd3e03bcd4ff4b21509895366313 +updated: 2018-06-07T13:10:54.09849+02:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -97,6 +97,8 @@ imports: version: 90663712d74cb411cbef281bc1e08c19d1a76145 - name: github.com/gorilla/mux version: e3702bed27f0d39777b0b37b664b6280e8ef8fbf +- name: github.com/grpc-ecosystem/go-grpc-prometheus + version: c225b8c3b01faf2899099b768856a9e916e5087b - name: github.com/grpc-ecosystem/grpc-gateway version: 58f78b988bc393694cef62b92c5cde77e4742ff5 subpackages: @@ -127,7 +129,7 @@ imports: - encoders/builtin - util - name: github.com/nats-io/go-nats-streaming - version: e15a53f85e4932540600a16b56f6c4f65f58176f + version: 6e620057a207bd61e992c1c5b6a2de7b6a4cb010 subpackages: - pb - name: github.com/nats-io/nats-streaming-server @@ -367,7 +369,6 @@ imports: - pkg/util/intstr - pkg/util/json - pkg/util/net - - pkg/util/remotecommand - pkg/util/runtime - pkg/util/sets - pkg/util/validation @@ -434,12 +435,10 @@ imports: - tools/pager - tools/portforward - tools/reference - - tools/remotecommand - transport - transport/spdy - util/buffer - util/cert - - util/exec - util/flowcontrol - util/homedir - util/integer diff --git a/glide.yaml b/glide.yaml index d49e5034..bfad3119 100644 --- a/glide.yaml +++ b/glide.yaml @@ -21,7 +21,7 @@ import: - package: github.com/nats-io/nuid version: ~1.0.0 - package: github.com/nats-io/go-nats-streaming - version: ^v0.3.4 + version: 6e620057a207bd61e992c1c5b6a2de7b6a4cb010 - package: github.com/robertkrimen/otto - package: gopkg.in/yaml.v2 - package: golang.org/x/sync @@ -63,6 +63,8 @@ import: version: ^0.8.0 - package: github.com/prometheus/client_golang version: 0.8.0 +- package: github.com/grpc-ecosystem/go-grpc-prometheus + version: ^1.2.0 testImport: - package: github.com/stretchr/testify version: 1.1.4 diff --git a/pkg/apiserver/invocation.go b/pkg/apiserver/invocation.go index 177df732..66e5f10b 100644 --- a/pkg/apiserver/invocation.go +++ b/pkg/apiserver/invocation.go @@ -3,10 +3,10 @@ package apiserver import ( "errors" "strings" - "time" "github.com/fission/fission-workflows/pkg/api" "github.com/fission/fission-workflows/pkg/fes" + "github.com/fission/fission-workflows/pkg/fnenv/workflows" "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/aggregates" "github.com/fission/fission-workflows/pkg/types/validate" @@ -15,14 +15,10 @@ import ( "golang.org/x/net/context" ) -const ( - invokeSyncTimeout = time.Duration(10) * time.Minute - invokeSyncPollingInterval = time.Duration(100) * time.Millisecond -) - type Invocation struct { api *api.Invocation wfiCache fes.CacheReader + fnenv *workflows.Runtime } func (gi *Invocation) Validate(ctx context.Context, spec *types.WorkflowInvocationSpec) (*empty.Empty, error) { @@ -35,7 +31,7 @@ func (gi *Invocation) Validate(ctx context.Context, spec *types.WorkflowInvocati } func NewInvocation(api *api.Invocation, wfiCache fes.CacheReader) WorkflowInvocationAPIServer { - return &Invocation{api, wfiCache} + return &Invocation{api, wfiCache, workflows.NewRuntime(api, wfiCache)} } func (gi *Invocation) Invoke(ctx context.Context, spec *types.WorkflowInvocationSpec) (*WorkflowInvocationIdentifier, error) { @@ -48,39 +44,7 @@ func (gi *Invocation) Invoke(ctx context.Context, spec *types.WorkflowInvocation } func (gi *Invocation) InvokeSync(ctx context.Context, spec *types.WorkflowInvocationSpec) (*types.WorkflowInvocation, error) { - wfiID, err := gi.api.Invoke(spec) - if err != nil { - logrus.Errorf("Failed to invoke workflow: %v", err) - return nil, err - } - - timeout, _ := context.WithTimeout(ctx, invokeSyncTimeout) - var result *types.WorkflowInvocation - for { - wi := aggregates.NewWorkflowInvocation(wfiID) - err := gi.wfiCache.Get(wi) - if err != nil { - logrus.Warnf("Failed to get workflow invocation from cache: %v", err) - } - if wi != nil && wi.GetStatus() != nil && wi.GetStatus().Finished() { - result = wi.WorkflowInvocation - break - } - - select { - case <-timeout.Done(): - err := gi.api.Cancel(wfiID) - if err != nil { - logrus.Errorf("Failed to cancel workflow invocation: %v", err) - } - return nil, errors.New("timeout occurred") - default: - // TODO polling is a temporary shortcut; needs optimizing. - time.Sleep(invokeSyncPollingInterval) - } - } - - return result, nil + return gi.fnenv.InvokeWorkflow(spec) } func (gi *Invocation) Cancel(ctx context.Context, invocationID *WorkflowInvocationIdentifier) (*empty.Empty, error) { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 8876c0e6..ecdddd80 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -8,6 +8,7 @@ import ( "time" "github.com/fission/fission-workflows/pkg/fes" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -18,8 +19,41 @@ const ( var ( log = logrus.New().WithFields(logrus.Fields{"component": "controller"}) metaLog = log.WithField("controller", "controller-meta") + + // Controller-related metrics + EvalJobs = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "workflows", + Subsystem: "controller_workflow", + Name: "eval_job", + Help: "Count of the different statuses of evaluations (e.g. skipped, errored, action).", + }, []string{"controller", "status"}) + + EvalRecovered = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "workflows", + Subsystem: "controller_workflow", + Name: "eval_recovered", + Help: "Count of the number of jobs that were lost and recovered", + }, []string{"controller", "from"}) + + EvalDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "workflows", + Subsystem: "controller_workflow", + Name: "eval_duration", + Help: "Duration of an evaluation.", + }, []string{"controller", "action"}) + + EvalQueueSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "workflows", + Subsystem: "controller_workflow", + Name: "eval_queue_size", + Help: "A gauge of the evaluation queue size", + }, []string{"controller"}) ) +func init() { + prometheus.MustRegister(EvalJobs, EvalDuration, EvalQueueSize, EvalRecovered) +} + type Controller interface { Init(ctx context.Context) error Tick(tick uint64) error diff --git a/pkg/controller/invocation/actions.go b/pkg/controller/invocation/actions.go index 55ba8b14..08b68de8 100644 --- a/pkg/controller/invocation/actions.go +++ b/pkg/controller/invocation/actions.go @@ -2,6 +2,7 @@ package invocation import ( "fmt" + "time" "github.com/fission/fission-workflows/pkg/api" "github.com/fission/fission-workflows/pkg/controller" @@ -74,24 +75,11 @@ func (a *ActionInvokeTask) Eval(cec controller.EvalContext) controller.Action { panic("not implemented") } -func (a *ActionInvokeTask) Apply() error { - wfiLog.Infof("Running task: %v", a.Task.Id) - // Find Task (static or dynamic) - task, ok := types.GetTask(a.Wf, a.Wfi, a.Task.Id) - if !ok { - return fmt.Errorf("task '%v' could not be found", a.Wfi.ID()) - } - wfiLog.Infof("Invoking function '%s' for Task '%s'", task.Spec.FunctionRef, a.Task.Id) - - // Check if resolved - if task.Status.FnRef == nil { - return fmt.Errorf("no resolved Task could be found for FunctionRef '%v'", task.Spec.FunctionRef) - } - +func (a *ActionInvokeTask) resolveInputs() (map[string]*types.TypedValue, error) { // Resolve the inputs scope, err := expr.NewScope(a.Wf, a.Wfi) if err != nil { - return errors.Wrapf(err, "failed to create scope for task '%v'", a.Task.Id) + return nil, errors.Wrapf(err, "failed to create scope for task '%v'", a.Task.Id) } a.StateStore.Set(a.Wfi.ID(), scope) @@ -113,7 +101,7 @@ func (a *ActionInvokeTask) Apply() error { "val": input.Key, "key": input.Val, }).Errorf("Failed to resolve input: %v", err) - return err + return nil, err } inputs[input.Key] = resolvedInput @@ -124,6 +112,30 @@ func (a *ActionInvokeTask) Apply() error { // Update the scope with the resolved type scope.Tasks[a.Task.Id].Inputs[input.Key] = typedvalues.MustFormat(resolvedInput) } + return inputs, nil +} + +func (a *ActionInvokeTask) Apply() error { + wfiLog.Infof("Running task: %v", a.Task.Id) + // Find Task (static or dynamic) + task, ok := types.GetTask(a.Wf, a.Wfi, a.Task.Id) + if !ok { + return fmt.Errorf("task '%v' could not be found", a.Wfi.ID()) + } + wfiLog.Infof("Invoking function '%s' for Task '%s'", task.Spec.FunctionRef, a.Task.Id) + + // Check if function has been resolved + if task.Status.FnRef == nil { + return fmt.Errorf("no resolved Task could be found for FunctionRef '%v'", task.Spec.FunctionRef) + } + + // Resolve inputs + exprEvalStart := time.Now() + inputs, err := a.resolveInputs() + exprEvalDuration.Observe(float64(time.Now().Sub(exprEvalStart))) + if err != nil { + return err + } // Invoke fnSpec := &types.TaskInvocationSpec{ diff --git a/pkg/controller/invocation/controller.go b/pkg/controller/invocation/controller.go index e436b5fa..4e581c2b 100644 --- a/pkg/controller/invocation/controller.go +++ b/pkg/controller/invocation/controller.go @@ -3,6 +3,7 @@ package invocation import ( "context" "errors" + "fmt" "time" "github.com/fission/fission-workflows/pkg/api" @@ -14,15 +15,46 @@ import ( "github.com/fission/fission-workflows/pkg/types/events" "github.com/fission/fission-workflows/pkg/util/labels" "github.com/fission/fission-workflows/pkg/util/pubsub" + "github.com/golang/protobuf/ptypes" + "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" ) const ( - NotificationBuffer = 100 - evalQueueSize = 50 + NotificationBuffer = 100 + defaultEvalQueueSize = 50 + Name = "invocation" ) -var wfiLog = log.WithField("component", "controller-wi") +var ( + wfiLog = log.WithField("component", "controller-wi") + + // workflow-related metrics + invocationStatus = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "workflows", + Subsystem: "controller_invocation", + Name: "status", + Help: "Count of the different statuses of workflow invocations.", + }, []string{"status"}) + + invocationDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "workflows", + Subsystem: "controller_invocation", + Name: "finished_duration", + Help: "Duration of an invocation from start to a finished state.", + }) + + exprEvalDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "workflows", + Subsystem: "controller_invocation", + Name: "expr_eval_duration", + Help: "Duration of the evaluation of the input expressions.", + }) +) + +func init() { + prometheus.MustRegister(invocationStatus, invocationDuration, exprEvalDuration) +} type Controller struct { invokeCache fes.CacheReader @@ -48,7 +80,7 @@ func NewController(invokeCache fes.CacheReader, wfCache fes.CacheReader, workflo scheduler: workflowScheduler, taskAPI: taskAPI, invocationAPI: invocationAPI, - evalQueue: make(chan string, evalQueueSize), + evalQueue: make(chan string, defaultEvalQueueSize), evalCache: controller.NewEvalCache(), stateStore: stateStore, @@ -93,6 +125,7 @@ func (cr *Controller) Init(sctx context.Context) error { select { case eval := <-cr.evalQueue: go cr.Evaluate(eval) // TODO limit number of goroutines + controller.EvalQueueSize.WithLabelValues("invocation").Dec() case <-ctx.Done(): return } @@ -173,6 +206,7 @@ func (cr *Controller) checkEvalCaches() error { reevaluateAt := last.Timestamp.Add(time.Duration(100) * time.Millisecond) if time.Now().UnixNano() > reevaluateAt.UnixNano() { + controller.EvalRecovered.WithLabelValues(Name, "evalStore").Inc() cr.submitEval(id) } } @@ -196,6 +230,7 @@ func (cr *Controller) checkModelCaches() error { } if !wi.Status.Finished() { + controller.EvalRecovered.WithLabelValues(Name, "cache").Inc() cr.submitEval(wi.ID()) } } @@ -206,6 +241,7 @@ func (cr *Controller) submitEval(ids ...string) bool { for _, id := range ids { select { case cr.evalQueue <- id: + controller.EvalQueueSize.WithLabelValues(Name).Inc() return true // ok default: @@ -217,6 +253,7 @@ func (cr *Controller) submitEval(ids ...string) bool { } func (cr *Controller) Evaluate(invocationID string) { + start := time.Now() // Fetch and attempt to claim the evaluation evalState := cr.evalCache.GetOrCreate(invocationID) select { @@ -225,6 +262,7 @@ func (cr *Controller) Evaluate(invocationID string) { default: // TODO provide option to wait for a lock wfiLog.Debugf("Failed to obtain access to invocation %s", invocationID) + controller.EvalJobs.WithLabelValues(Name, "duplicate").Inc() return } log.Debugf("evaluating invocation %s", invocationID) @@ -235,11 +273,13 @@ func (cr *Controller) Evaluate(invocationID string) { // TODO move to rule if err != nil && wfi.WorkflowInvocation == nil { log.Errorf("controller failed to get invocation for invocation id '%s': %v", invocationID, err) + controller.EvalJobs.WithLabelValues(Name, "error").Inc() return } // TODO move to rule if wfi.Status.Finished() { wfiLog.Debugf("No need to evaluate finished invocation %v", invocationID) + controller.EvalJobs.WithLabelValues(Name, "error").Inc() return } @@ -250,6 +290,7 @@ func (cr *Controller) Evaluate(invocationID string) { if err != nil && wf.Workflow == nil { log.Errorf("controller failed to get workflow '%s' for invocation id '%s': %v", wfi.Spec.WorkflowId, invocationID, err) + controller.EvalJobs.WithLabelValues(Name, "error").Inc() return } @@ -261,6 +302,7 @@ func (cr *Controller) Evaluate(invocationID string) { action := cr.evalPolicy.Eval(ec) record.Action = action if action == nil { + controller.EvalJobs.WithLabelValues(Name, "noop").Inc() return } @@ -270,9 +312,17 @@ func (cr *Controller) Evaluate(invocationID string) { log.Errorf("Action '%T' failed: %v", action, err) record.Error = err } + controller.EvalJobs.WithLabelValues(Name, "action").Inc() // Record this evaluation evalState.Record(record) + + controller.EvalDuration.WithLabelValues(Name, fmt.Sprintf("%T", action)).Observe(float64(time.Now().Sub(start))) + if wfi.GetStatus().Finished() { + t, _ := ptypes.Timestamp(wfi.GetMetadata().GetCreatedAt()) + invocationDuration.Observe(float64(time.Now().Sub(t))) + } + invocationStatus.WithLabelValues(wfi.GetStatus().GetStatus().String()).Inc() } func (cr *Controller) Close() error { diff --git a/pkg/controller/workflow/controller.go b/pkg/controller/workflow/controller.go index 699127c7..2afac441 100644 --- a/pkg/controller/workflow/controller.go +++ b/pkg/controller/workflow/controller.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "time" "github.com/fission/fission-workflows/pkg/api" "github.com/fission/fission-workflows/pkg/controller" @@ -12,15 +13,40 @@ import ( "github.com/fission/fission-workflows/pkg/types/aggregates" "github.com/fission/fission-workflows/pkg/util/labels" "github.com/fission/fission-workflows/pkg/util/pubsub" + "github.com/golang/protobuf/ptypes" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) const ( - NotificationBuffer = 100 - evalQueueSize = 50 + NotificationBuffer = 100 + defaultEvalQueueSize = 50 + Name = "workflow" ) -var wfLog = logrus.WithField("component", "controller.wf") +// TODO add hard limits (cache size, max concurrent invocation) + +var ( + wfLog = logrus.WithField("component", "controller.wf") + + workflowProcessDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "workflows", + Subsystem: "controller_workflow", + Name: "parsed_duration", + Help: "Duration of a workflow from a start to a parsed state.", + }) + + workflowStatus = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "workflows", + Subsystem: "controller_workflow", + Name: "status", + Help: "Count of the different statuses of workflows.", + }, []string{"status"}) +) + +func init() { + prometheus.MustRegister(workflowProcessDuration, workflowStatus) +} // WorkflowController is the controller concerned with the lifecycle of workflows. It handles responsibilities, such as // parsing of workflows. @@ -38,7 +64,7 @@ func NewController(wfCache fes.CacheReader, wfAPI *api.Workflow) *Controller { ctr := &Controller{ wfCache: wfCache, api: wfAPI, - evalQueue: make(chan string, evalQueueSize), + evalQueue: make(chan string, defaultEvalQueueSize), evalCache: controller.NewEvalCache(), } ctr.evalPolicy = defaultPolicy(ctr) @@ -76,6 +102,7 @@ func (c *Controller) Init(sctx context.Context) error { for { select { case eval := <-c.evalQueue: + controller.EvalQueueSize.WithLabelValues(Name).Dec() go c.Evaluate(eval) // TODO limit number of goroutines case <-ctx.Done(): return @@ -98,29 +125,8 @@ func (c *Controller) handleMsg(msg pubsub.Msg) error { } func (c *Controller) Tick(tick uint64) error { - // Assume that all workflows are in evalCache - //now := time.Now() // TODO short loop: eval cache // TODO longer loop: cache - //for _, a := range c.wfCache.List() { - // if locked { - // continue - // } - // - // wfEntity, err := c.wfCache.GetAggregate(a) - // if err != nil { - // return fmt.Errorf("failed to retrieve: %v", err) - // } - // - // wf, ok := wfEntity.(*aggregates.Workflow) - // if !ok { - // wfLog.WithField("wfEntity", wfEntity).WithField("type", reflect.TypeOf(wfEntity)). - // Error("Unexpected type in wfCache") - // panic(fmt.Sprintf("unexpected type '%v' in wfCache", reflect.TypeOf(wfEntity))) - // } - // - // c.submitEval(wf.id()) - //} return nil } @@ -135,6 +141,7 @@ func (c *Controller) Notify(msg *fes.Notification) error { } func (c *Controller) Evaluate(workflowID string) { + start := time.Now() // Fetch and attempt to claim the evaluation evalState := c.evalCache.GetOrCreate(workflowID) select { @@ -143,6 +150,7 @@ func (c *Controller) Evaluate(workflowID string) { default: // TODO provide option to wait for a lock wfLog.Debugf("Failed to obtain access to workflow %s", workflowID) + controller.EvalJobs.WithLabelValues(Name, "duplicate").Inc() return } wfLog.Debugf("evaluating workflow %s", workflowID) @@ -153,6 +161,7 @@ func (c *Controller) Evaluate(workflowID string) { // TODO move to rule if err != nil && wf.Workflow == nil { logrus.Errorf("controller failed to get workflow '%s': %v", workflowID, err) + controller.EvalJobs.WithLabelValues(Name, "error").Inc() return } @@ -162,6 +171,11 @@ func (c *Controller) Evaluate(workflowID string) { ec := NewEvalContext(evalState, wf.Workflow) action := c.evalPolicy.Eval(ec) + if action == nil { + controller.EvalJobs.WithLabelValues(Name, "noop").Inc() + return + } + record.Action = action // Execute action @@ -170,9 +184,17 @@ func (c *Controller) Evaluate(workflowID string) { wfLog.Errorf("Action '%T' failed: %v", action, err) record.Error = err } + controller.EvalJobs.WithLabelValues(Name, "action").Inc() // Record this evaluation evalState.Record(record) + + controller.EvalDuration.WithLabelValues(Name, fmt.Sprintf("%T", action)).Observe(float64(time.Now().Sub(start))) + if wf.GetStatus().Ready() { // TODO only once + t, _ := ptypes.Timestamp(wf.GetMetadata().GetCreatedAt()) + workflowProcessDuration.Observe(float64(time.Now().Sub(t))) + } + workflowStatus.WithLabelValues(wf.GetStatus().GetStatus().String()).Inc() } func (c *Controller) Close() error { @@ -192,6 +214,7 @@ func (c *Controller) submitEval(ids ...string) bool { for _, id := range ids { select { case c.evalQueue <- id: + controller.EvalQueueSize.WithLabelValues(Name).Inc() return true // ok default: diff --git a/pkg/fes/backend/mem/mem.go b/pkg/fes/backend/mem/mem.go index c51ff7fb..87333a19 100644 --- a/pkg/fes/backend/mem/mem.go +++ b/pkg/fes/backend/mem/mem.go @@ -6,10 +6,18 @@ import ( "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/util/pubsub" + "github.com/prometheus/client_golang/prometheus" ) var ( ErrInvalidAggregate = errors.New("invalid aggregate") + + eventsAppended = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "fes", + Subsystem: "mem", + Name: "events_appended_total", + Help: "Count of appended events (excluding any internal events).", + }, []string{"eventType"}) ) // An in-memory, fes backend for development and testing purposes @@ -42,6 +50,7 @@ func (b *Backend) Append(event *fes.Event) error { } b.contents[key] = append(events, event) + eventsAppended.WithLabelValues(event.Type).Inc() return b.Publish(event) } diff --git a/pkg/fes/backend/nats/client.go b/pkg/fes/backend/nats/client.go index feef81fd..d7cbfebd 100644 --- a/pkg/fes/backend/nats/client.go +++ b/pkg/fes/backend/nats/client.go @@ -3,11 +3,13 @@ package nats import ( "fmt" "strings" + "time" "github.com/fission/fission-workflows/pkg/fes" "github.com/fission/fission-workflows/pkg/util/pubsub" "github.com/golang/protobuf/proto" - nats "github.com/nats-io/go-nats" + "github.com/golang/protobuf/ptypes" + "github.com/nats-io/go-nats" "github.com/nats-io/go-nats-streaming" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" @@ -18,26 +20,33 @@ const ( ) var ( - subsActive = prometheus.NewGauge(prometheus.GaugeOpts{ + subsActive = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "fes", Subsystem: "nats", Name: "subs_active", Help: "Number of active subscriptions to NATS subjects.", - }) + }, []string{"subType"}) - eventsAppended = prometheus.NewCounter(prometheus.CounterOpts{ + eventsAppended = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "fes", Subsystem: "nats", Name: "events_appended_total", - Help: "Count of appended events (excluding any internal events).", + Help: "Count of appended events (including any internal events).", + }, []string{"eventType"}) + + eventDelay = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "fes", + Subsystem: "nats", + Name: "event_propagation_delay", + Help: "Delay between event publish and receive by the subscribers.", }) ) func init() { - prometheus.MustRegister(subsActive) - prometheus.MustRegister(eventsAppended) + prometheus.MustRegister(subsActive, eventsAppended, eventDelay) } +// EventStore is a NATS-based implementation of the EventStore interface. type EventStore struct { pubsub.Publisher conn *WildcardConn @@ -46,14 +55,9 @@ type EventStore struct { } type Config struct { - //Cluster: clusterId, - //Client: "someClient", - //URL: fmt.Sprintf("nats://%s:%d", address, port), Cluster string Client string - - // Example: nats://localhost:9300 - URL string + URL string // e.g. nats://localhost:9300 } func NewEventStore(conn *WildcardConn, cfg Config) *EventStore { @@ -65,6 +69,7 @@ func NewEventStore(conn *WildcardConn, cfg Config) *EventStore { } } +// Connect to a NATS cluster using the config. func Connect(cfg Config) (*EventStore, error) { if cfg.Client == "" { cfg.Client = defaultClient @@ -86,13 +91,14 @@ func Connect(cfg Config) (*EventStore, error) { return NewEventStore(wconn, cfg), nil } -// Watch a aggregate type +// Watch a aggregate type for new events. The events are emitted over the publisher interface. func (es *EventStore) Watch(aggregate fes.Aggregate) error { subject := fmt.Sprintf("%s.>", aggregate.Type) sub, err := es.conn.Subscribe(subject, func(msg *stan.Msg) { event, err := toEvent(msg) if err != nil { logrus.Error(err) + return } logrus.WithFields(logrus.Fields{ @@ -106,7 +112,13 @@ func (es *EventStore) Watch(aggregate fes.Aggregate) error { err = es.Publisher.Publish(event) if err != nil { logrus.Error(err) + return } + + // Record the time it took for the event to be propagated from publisher to subscriber. + ts, _ := ptypes.Timestamp(event.Timestamp) + eventDelay.Observe(float64(time.Now().Sub(ts).Nanoseconds())) + }, stan.DeliverAllAvailable()) if err != nil { return err @@ -114,7 +126,6 @@ func (es *EventStore) Watch(aggregate fes.Aggregate) error { logrus.Infof("Backend client watches:' %s'", subject) es.sub[aggregate] = sub - subsActive.Inc() return nil } @@ -123,10 +134,10 @@ func (es *EventStore) Close() error { if err != nil { return err } - subsActive.Dec() return nil } +// Append publishes (and persists) an event on the NATS message queue func (es *EventStore) Append(event *fes.Event) error { // TODO make generic / configurable whether to fold event into parent's Subject subject := toSubject(event.Aggregate) @@ -142,16 +153,18 @@ func (es *EventStore) Append(event *fes.Event) error { "aggregate": event.Aggregate.Format(), "parent": event.Parent.Format(), "nats.subject": subject, - }).Infof("Appending event: %v", event.Type) + }).Infof("Event added: %v", event.Type) err = es.conn.Publish(subject, data) if err != nil { return err } - eventsAppended.Inc() + eventsAppended.WithLabelValues(event.Type).Inc() + eventsAppended.WithLabelValues("control").Inc() return nil } +// Get returns all events related to a specific aggregate func (es *EventStore) Get(aggregate *fes.Aggregate) ([]*fes.Event, error) { subject := toSubject(aggregate) @@ -171,6 +184,7 @@ func (es *EventStore) Get(aggregate *fes.Aggregate) ([]*fes.Event, error) { return results, nil } +// List returns all entities of which the subject matches the StringMatcher func (es *EventStore) List(matcher fes.StringMatcher) ([]fes.Aggregate, error) { subjects, err := es.conn.List(matcher) if err != nil { diff --git a/pkg/fes/backend/nats/nats.go b/pkg/fes/backend/nats/nats.go index 060c4a52..dd936e0e 100644 --- a/pkg/fes/backend/nats/nats.go +++ b/pkg/fes/backend/nats/nats.go @@ -113,10 +113,10 @@ func (cn *Conn) MsgSeqRange(subject string, seqStart uint64, seqEnd uint64) ([]* if err != nil { return nil, err } - subsActive.Inc() + subsActive.WithLabelValues("counter").Inc() defer func() { sub.Close() - subsActive.Dec() + subsActive.WithLabelValues("counter").Dec() }() for { @@ -142,7 +142,6 @@ func NewWildcardConn(conn stan.Conn) *WildcardConn { func (wc *WildcardConn) Subscribe(wildcardSubject string, cb stan.MsgHandler, opts ...stan.SubscriptionOption) (stan.Subscription, error) { if !hasWildcard(wildcardSubject) { - subsActive.Inc() return wc.Conn.Subscribe(wildcardSubject, cb, opts...) } @@ -184,6 +183,7 @@ func (wc *WildcardConn) Subscribe(wildcardSubject string, cb stan.MsgHandler, op logrus.Errorf("Failed to subscribe to wildcardSubject '%v': %v", subjectEvent, err) } ws.sources[subject] = sub + subsActive.WithLabelValues(subjectEvent.Subject[:strings.Index(subjectEvent.Subject, ".")]).Inc() } case deleted: // Delete the current listener of the subject of the event @@ -192,7 +192,7 @@ func (wc *WildcardConn) Subscribe(wildcardSubject string, cb stan.MsgHandler, op if err != nil { logrus.Errorf("Failed to close (sub)listener: %v", err) } - subsActive.Dec() + subsActive.WithLabelValues(subjectEvent.Subject[:strings.Index(subjectEvent.Subject, ".")]).Dec() } default: panic(fmt.Sprintf("Unknown eventType: %v", subjectEvent)) @@ -293,7 +293,6 @@ func (ws *WildcardSub) Unsubscribe() error { err := ws.activitySub.Unsubscribe() for id, source := range ws.sources { err = source.Unsubscribe() - subsActive.Dec() delete(ws.sources, id) } return err diff --git a/pkg/fes/caches.go b/pkg/fes/caches.go index a0ef9101..d0772aec 100644 --- a/pkg/fes/caches.go +++ b/pkg/fes/caches.go @@ -3,6 +3,7 @@ package fes import ( "context" "errors" + "fmt" "sync" "time" @@ -22,14 +23,13 @@ const ( var ( ErrNotFound = errors.New("could not find entity") - cacheCount = prometheus.NewGauge(prometheus.GaugeOpts{ + cacheCount = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "fes", Subsystem: "cache", Name: "current_cache_counts", Help: "The current number of entries in the caches", - }) - - // TODO add metrics: cache size, access latencies, update latencies (from event creation -> notification) + }, []string{"name"}) + // TODO add additional metrics once cache has been improved ) func init() { @@ -38,14 +38,25 @@ func init() { // MapCache provides a simple non-preempting map-based CacheReaderWriter implementation. type MapCache struct { + Name string contents map[string]map[string]Aggregator // Map: AggregateType -> AggregateId -> entity lock *sync.RWMutex } func NewMapCache() *MapCache { + c := &MapCache{ + contents: map[string]map[string]Aggregator{}, + lock: &sync.RWMutex{}, + } + c.Name = fmt.Sprintf("%p", c) + return c +} + +func NewNamedMapCache(name string) *MapCache { return &MapCache{ contents: map[string]map[string]Aggregator{}, lock: &sync.RWMutex{}, + Name: name, } } @@ -108,7 +119,7 @@ func (rc *MapCache) Put(entity Aggregator) error { rc.contents[ref.Type] = map[string]Aggregator{} } rc.contents[ref.Type][ref.Id] = entity - cacheCount.Inc() + cacheCount.WithLabelValues(rc.Name).Inc() return nil } @@ -116,7 +127,7 @@ func (rc *MapCache) Invalidate(ref *Aggregate) { rc.lock.Lock() defer rc.lock.Unlock() delete(rc.contents[ref.Type], ref.Id) - cacheCount.Dec() + cacheCount.WithLabelValues(rc.Name).Dec() } func (rc *MapCache) List() []Aggregate { diff --git a/pkg/fnenv/fission/envproxy.go b/pkg/fnenv/fission/envproxy.go index 7bdaeb44..4612c8cf 100644 --- a/pkg/fnenv/fission/envproxy.go +++ b/pkg/fnenv/fission/envproxy.go @@ -25,6 +25,7 @@ import ( // Proxy between Fission and ParseWorkflow to ensure that workflowInvocations comply with Fission function interface. This // ensures that workflows can be executed exactly like Fission functions are executed. type Proxy struct { + // TODO change server to client invocationServer apiserver.WorkflowInvocationAPIServer workflowServer apiserver.WorkflowAPIServer fissionIds syncmap.Map // map[string]bool diff --git a/pkg/fnenv/fission/resolver.go b/pkg/fnenv/fission/resolver.go index 9d7ef8fc..bb214617 100644 --- a/pkg/fnenv/fission/resolver.go +++ b/pkg/fnenv/fission/resolver.go @@ -28,6 +28,5 @@ func (re *Resolver) Resolve(fnName string) (string, error) { id := fnName log.Infof("Resolved fission function %s to %s", fnName, id) - return id, nil } diff --git a/pkg/fnenv/fission/runtime.go b/pkg/fnenv/fission/runtime.go index 256aefd6..a8bbb0b6 100644 --- a/pkg/fnenv/fission/runtime.go +++ b/pkg/fnenv/fission/runtime.go @@ -6,6 +6,7 @@ import ( "net/url" "time" + "github.com/fission/fission-workflows/pkg/fnenv" "github.com/fission/fission-workflows/pkg/fnenv/common/httpconv" "github.com/fission/fission-workflows/pkg/types/validate" "github.com/sirupsen/logrus" @@ -17,6 +18,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + Name = "fission" +) + var log = logrus.WithField("component", "fnenv.fission") // FunctionEnv adapts the Fission platform to the function execution runtime. This allows the workflow engine @@ -65,11 +70,16 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca } // Perform request + timeStart := time.Now() + fnenv.FnActive.WithLabelValues(Name).Inc() + defer fnenv.FnExecTime.WithLabelValues(Name).Observe(float64(time.Since(timeStart))) ctxLog.Infof("Invoking Fission function: '%v'.", req.URL) resp, err := http.DefaultClient.Do(req) if err != nil { return nil, fmt.Errorf("error for reqUrl '%v': %v", url, err) } + fnenv.FnActive.WithLabelValues(Name).Dec() + fnenv.FnActive.WithLabelValues(Name).Inc() // Parse output output, err := httpconv.ParseBody(resp.Body, resp.Header.Get("Content-Type")) diff --git a/pkg/fnenv/fission/timed.go b/pkg/fnenv/fission/timed.go index 99080baf..e14e7bc3 100644 --- a/pkg/fnenv/fission/timed.go +++ b/pkg/fnenv/fission/timed.go @@ -6,6 +6,7 @@ import ( "time" ) +// TimedExecPool provides a data structure for scheduling executions based on a timestamp. type TimedExecPool struct { fnQueue *timedFnQueue cancel chan struct{} diff --git a/pkg/fnenv/fnenv.go b/pkg/fnenv/fnenv.go index e2125d83..c2e57233 100644 --- a/pkg/fnenv/fnenv.go +++ b/pkg/fnenv/fnenv.go @@ -19,12 +19,38 @@ import ( "time" "github.com/fission/fission-workflows/pkg/types" + "github.com/prometheus/client_golang/prometheus" ) var ( ErrInvalidRuntime = errors.New("invalid runtime") + + FnActive = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "workflows", + Subsystem: "fnenv", + Name: "functions_active", + Help: "Number of Fission function executions that are currently active", + }, []string{"fnenv"}) + + FnCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "fnenv", + Subsystem: "fission", + Name: "functions_execution_total", + Help: "Total number of Fission function executions", + }, []string{"fnenv"}) + + FnExecTime = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "fnenv", + Subsystem: "fission", + Name: "function_execution_time_milliseconds", + Help: "Execution time summary of the Fission functions", + }, []string{"fnenv"}) ) +func init() { + prometheus.MustRegister(FnActive, FnCount, FnExecTime) +} + // Runtime is the minimal interface that a function runtime environment needs to conform with to handle tasks. type Runtime interface { // Invoke executes the task in a blocking way. diff --git a/pkg/fnenv/native/native.go b/pkg/fnenv/native/native.go index 1ccf1331..211b5307 100644 --- a/pkg/fnenv/native/native.go +++ b/pkg/fnenv/native/native.go @@ -6,11 +6,11 @@ import ( "runtime/debug" "time" + "github.com/fission/fission-workflows/pkg/fnenv" "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/validate" "github.com/golang/protobuf/ptypes" - "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" ) @@ -18,40 +18,6 @@ const ( Name = "native" ) -var ( - fnActive = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "fnenv", - Subsystem: "native", - Name: "functions_active", - Help: "Number of function executions that are currently active", - }) - - fnCount = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "fnenv", - Subsystem: "native", - Name: "functions_execution_total", - Help: "Total number of function executions", - }) - - fnResolved = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "fnenv", - Subsystem: "native", - Name: "functions_resolved_total", - Help: "Total number of function resolved", - }) - - fnExecTime = prometheus.NewSummary(prometheus.SummaryOpts{ - Namespace: "fnenv", - Subsystem: "native", - Name: "function_execution_time_milliseconds", - Help: "Execution time summary of the internal functions", - }) -) - -func init() { - prometheus.MustRegister(fnActive, fnResolved, fnExecTime, fnCount) -} - // An InternalFunction is a function that will be executed in the same process as the invoker. type InternalFunction interface { Invoke(spec *types.TaskInvocationSpec) (*types.TypedValue, error) @@ -87,16 +53,16 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca } timeStart := time.Now() - defer fnExecTime.Observe(float64(time.Since(timeStart))) + defer fnenv.FnExecTime.WithLabelValues(Name).Observe(float64(time.Since(timeStart))) fnID := spec.FnRef.ID fn, ok := fe.fns[fnID] if !ok { return nil, fmt.Errorf("could not resolve internal function '%s'", fnID) } - fnActive.Inc() + fnenv.FnActive.WithLabelValues(Name).Inc() out, err := fn.Invoke(spec) - fnActive.Dec() - fnCount.Inc() + fnenv.FnActive.WithLabelValues(Name).Dec() + fnenv.FnCount.WithLabelValues(Name).Inc() if err != nil { log.WithFields(log.Fields{ "fnID": fnID, @@ -119,7 +85,6 @@ func (fe *FunctionEnv) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvoca } func (fe *FunctionEnv) Resolve(fnName string) (string, error) { - fnResolved.Inc() _, ok := fe.fns[fnName] if !ok { return "", fmt.Errorf("could not resolve internal function '%s'", fnName) diff --git a/pkg/fnenv/resolver.go b/pkg/fnenv/resolver.go index 2662b6e9..82d73856 100644 --- a/pkg/fnenv/resolver.go +++ b/pkg/fnenv/resolver.go @@ -6,6 +6,7 @@ import ( "time" "github.com/fission/fission-workflows/pkg/types" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -13,6 +14,19 @@ const ( defaultTimeout = time.Duration(1) * time.Minute ) +var ( + fnResolved = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "fnenv", + Subsystem: "fission", + Name: "functions_resolved_total", + Help: "Total number of Fission functions resolved", + }, []string{"fnenv"}) +) + +func init() { + prometheus.MustRegister(fnResolved) +} + // MetaResolver contacts function execution runtime clients to resolve the function definitions to concrete function ids. // // ParseTask definitions (See types/TaskDef) can contain the following function reference: @@ -94,6 +108,8 @@ func (ps *MetaResolver) resolveForRuntime(targetFn string, runtime string) (type if err != nil { return types.FnRef{}, err } + + fnResolved.WithLabelValues(runtime).Inc() return types.FnRef{ Runtime: runtime, ID: rsv, diff --git a/pkg/fnenv/workflows/workflows.go b/pkg/fnenv/workflows/workflows.go index e240f60f..ad99ea4a 100644 --- a/pkg/fnenv/workflows/workflows.go +++ b/pkg/fnenv/workflows/workflows.go @@ -8,6 +8,7 @@ import ( "github.com/fission/fission-workflows/pkg/api" "github.com/fission/fission-workflows/pkg/fes" + "github.com/fission/fission-workflows/pkg/fnenv" "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/aggregates" "github.com/fission/fission-workflows/pkg/types/typedvalues" @@ -50,8 +51,19 @@ func (rt *Runtime) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation wfSpec.ParentId = parentID } + wfi, err := rt.InvokeWorkflow(wfSpec) + if err != nil { + return nil, err + } + return wfi.Status.ToTaskStatus(), nil +} + +func (rt *Runtime) InvokeWorkflow(spec *types.WorkflowInvocationSpec) (*types.WorkflowInvocation, error) { // Invoke workflow - wfiID, err := rt.api.Invoke(wfSpec) + timeStart := time.Now() + defer fnenv.FnExecTime.WithLabelValues(Name).Observe(float64(time.Since(timeStart))) + fnenv.FnActive.WithLabelValues(Name).Inc() + wfiID, err := rt.api.Invoke(spec) if err != nil { logrus.Errorf("Failed to invoke workflow: %v", err) return nil, err @@ -83,6 +95,8 @@ func (rt *Runtime) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation time.Sleep(invokeSyncPollingInterval) } } + fnenv.FnActive.WithLabelValues(Name).Dec() + fnenv.FnCount.WithLabelValues(Name).Inc() - return result.Status.ToTaskStatus(), nil + return result, nil } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 71ef1bb4..2917fa85 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -2,15 +2,36 @@ package scheduler import ( "fmt" + "time" "github.com/fission/fission-workflows/pkg/types" "github.com/fission/fission-workflows/pkg/types/graph" "github.com/golang/protobuf/ptypes" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) var log = logrus.WithField("component", "scheduler") +var ( + metricEvalTime = prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: "workflows", + Subsystem: "scheduler", + Name: "eval_time", + Help: "Statistics of scheduler evaluations", + }) + metricEvalCount = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "workflows", + Subsystem: "scheduler", + Name: "eval_count", + Help: "Number of evaluations", + }) +) + +func init() { + prometheus.MustRegister(metricEvalCount, metricEvalTime) +} + type WorkflowScheduler struct { } @@ -18,6 +39,11 @@ func (ws *WorkflowScheduler) Evaluate(request *ScheduleRequest) (*Schedule, erro ctxLog := log.WithFields(logrus.Fields{ "wfi": request.Invocation.Metadata.Id, }) + timeStart := time.Now() + defer func() { + metricEvalTime.Observe(float64(time.Since(timeStart))) + metricEvalCount.Inc() + }() schedule := &Schedule{ InvocationId: request.Invocation.Metadata.Id, diff --git a/pkg/types/extensions.go b/pkg/types/extensions.go index b7701219..a9c8acbd 100644 --- a/pkg/types/extensions.go +++ b/pkg/types/extensions.go @@ -308,6 +308,10 @@ func (m *WorkflowStatus) Ready() bool { return m.Status == WorkflowStatus_READY } +func (m *WorkflowStatus) Failed() bool { + return m.Status == WorkflowStatus_FAILED +} + func (m *WorkflowStatus) AddTaskStatus(id string, t *TaskStatus) { if m.Tasks == nil { m.Tasks = map[string]*TaskStatus{} From 09fea1225ef95478ab2639956682df46386b0b08 Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Thu, 7 Jun 2018 15:21:41 +0200 Subject: [PATCH 4/4] Added a document describing the instrumentation setup --- Docs/README.md | 3 ++- Docs/instrumentation.md | 56 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 Docs/instrumentation.md diff --git a/Docs/README.md b/Docs/README.md index 59f8cd21..0b24cdbb 100644 --- a/Docs/README.md +++ b/Docs/README.md @@ -7,4 +7,5 @@ - [Functions](./functions.md) - [Data](data.md) - [Roadmap](./roadmap.md) -- [Deployment Administration](./admin.md) \ No newline at end of file +- [Deployment Administration](./admin.md) +- [Instrumentation and Logging](./instrumentation.md) \ No newline at end of file diff --git a/Docs/instrumentation.md b/Docs/instrumentation.md new file mode 100644 index 00000000..89f630d8 --- /dev/null +++ b/Docs/instrumentation.md @@ -0,0 +1,56 @@ +# Instrumentation + +Fission Workflows provides (or aims to provide) several forms of instrumentation to help you gain insight into +the engine. + +It contains the following main features: +- It uses Prometheus to expose various, both low-level and high-level, metrics. +- Logging is provided using the Golang library logrus for structured logging. +- (future) A predefined Grafana dashboard to provide visualizations of the various Prometheus metrics. + +## Prometheus + +When enabled---using the `--metrics` flag---the workflow engine collects exposes various metrics over HTTP. +The default path to the exposes metrics is `${WORKFLOWS_IP}:8080/metrics`. + +These metrics are scraped at a regular interval by a Prometheus instance, which is not included in Fission. +To setup Prometheus quickly: +```bash +helm install --namespace monitoring --name prometheus stable/prometheus +``` + +To enable prometheus to discover Fission Workflows, specific annotations need to be present on the pod. +For now you need to manually add annotations to the Fission Workflows deployment in the `fission-function` namespace. +```bash +NS=fission-function +kubectl -n ${NS} edit $(kubectl get po -n ${NS} -o name | grep workflow) +``` + +And add the following to the metadata. +```bash +annotations: + prometheus.io/path: /metrics + prometheus.io/port: "8080" + prometheus.io/scrape: "true" +``` + +(In the near future, this step will be done by Fission automatically.) + +Now, to access metrics in the Prometheus dashboard you just need to exposes the prometheus-server in the `monitoring` +namespace or access the cluserIP from within the cluster. + +### Prometheus NATS exporter +Given that NATS streaming plays an important role in the workflow system, it is also useful to collect the metrics of +NATS into prometheus. Although not directly implemented in the NATS deployments, there is the +[Prometheus NATS exporter](https://github.com/nats-io/prometheus-nats-exporter) as a separate module to install. + +## Grafana +A common way to visualize the metrics collected with Prometheus is to use Grafana to create and share graphs and +other types of visualizations. + +```bash +helm install --namespace monitoring --name prometheus stable/grafana +``` + +In the future, we will provide a pre-built Grafana dashboard with useful graphs to provide you insight into the +system, without needing to build dashboards yourself. \ No newline at end of file