Skip to content

Commit

Permalink
Merge pull request #157 from fission/log-correlation
Browse files Browse the repository at this point in the history
Log correlation
  • Loading branch information
erwinvaneyk authored Jun 25, 2018
2 parents d85e4ba + 9f050d0 commit e8b5f22
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 117 deletions.
115 changes: 81 additions & 34 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package bundle

import (
"context"
"fmt"
"net"
"net/http"
"os"
"time"

"github.com/fission/fission-workflows/pkg/api"
"github.com/fission/fission-workflows/pkg/apiserver"
Expand Down Expand Up @@ -63,7 +65,10 @@ type FissionOptions struct {

// Run serves enabled components in a blocking way
func Run(ctx context.Context, opts *Options) error {
log.WithField("version", version.VersionInfo()).Info("Starting bundle...")
log.WithFields(log.Fields{
"version": fmt.Sprintf("%+v", version.VersionInfo()),
"config": fmt.Sprintf("%+v", opts),
}).Info("Starting bundle...")

var es fes.Backend
var esPub pubsub.Publisher
Expand All @@ -72,9 +77,10 @@ func Run(ctx context.Context, opts *Options) error {
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
)
defer grpcServer.GracefulStop()

// Event Stores
//
// Event Store
//
if opts.Nats != nil {
log.WithFields(log.Fields{
"url": "!redacted!",
Expand All @@ -95,7 +101,9 @@ func Run(ctx context.Context, opts *Options) error {
wfiCache := getWorkflowInvocationCache(ctx, esPub)
wfCache := getWorkflowCache(ctx, esPub)

// Resolvers and runtimes
//
// Function Runtimes
//
invocationAPI := api.NewInvocationAPI(es)
resolvers := map[string]fnenv.RuntimeResolver{}
runtimes := map[string]fnenv.Runtime{}
Expand All @@ -109,8 +117,10 @@ func Run(ctx context.Context, opts *Options) error {
}
if opts.InternalRuntime {
log.Infof("Using Task Runtime: Internal")
runtimes["internal"] = setupInternalFunctionRuntime()
resolvers["internal"] = setupInternalFunctionRuntime()
internalRuntime := setupInternalFunctionRuntime()
runtimes["internal"] = internalRuntime
resolvers["internal"] = internalRuntime
log.Infof("Internal runtime functions: %v", internalRuntime.Installed())
}
if opts.Fission != nil {
log.WithFields(log.Fields{
Expand All @@ -122,7 +132,9 @@ func Run(ctx context.Context, opts *Options) error {
resolvers["fission"] = setupFissionFunctionResolver(opts.Fission.ControllerAddr)
}

// Controller
//
// Controllers
//
if opts.InvocationController || opts.WorkflowController {
var ctrls []controller.Controller
if opts.WorkflowController {
Expand All @@ -135,26 +147,51 @@ func Run(ctx context.Context, opts *Options) error {
ctrls = append(ctrls, setupInvocationController(wfiCache(), wfCache(), es, runtimes, resolvers))
}

log.Info("Running controllers.")
runController(ctx, ctrls...)
ctrl := controller.NewMetaController(ctrls...)
go ctrl.Run(ctx)
defer func() {
err := ctrl.Close()
if err != nil {
log.Errorf("Failed to stop controllers: %v", err)
} else {
log.Info("Stopped controllers")
}
}()
} else {
log.Info("No controllers specified to run.")
}

// Http servers
//
// Fission integration
//
if opts.Fission != nil {
proxyMux := http.NewServeMux()
runFissionEnvironmentProxy(proxyMux, es, wfiCache(), wfCache(), resolvers)
proxySrv := &http.Server{Addr: fissionProxyAddress}
proxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux)
fissionProxySrv := &http.Server{Addr: fissionProxyAddress}
fissionProxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux)

if opts.Metrics {
setupMetricsEndpoint(proxyMux)
}

go proxySrv.ListenAndServe()
defer proxySrv.Shutdown(ctx)
log.Info("Serving HTTP Fission Proxy at: ", proxySrv.Addr)
go func() {
err := fissionProxySrv.ListenAndServe()
log.WithField("err", err).Info("Fission Proxy server stopped")
}()
defer func() {
err := fissionProxySrv.Shutdown(ctx)
if err != nil {
log.Errorf("Failed to stop Fission Proxy server: %v", err)
} else {
log.Info("Stopped Fission Proxy server")
}
}()
log.Info("Serving HTTP Fission Proxy at: ", fissionProxySrv.Addr)
}

//
// gRPC API
//
if opts.AdminAPI {
serveAdminAPI(grpcServer)
}
Expand All @@ -168,18 +205,27 @@ func Run(ctx context.Context, opts *Options) error {
}

if opts.AdminAPI || opts.WorkflowAPI || opts.InvocationAPI {
log.Info("Instrumenting gRPC server with Prometheus metrics")
grpc_prometheus.Register(grpcServer)
if opts.Metrics {
log.Debug("Instrumenting gRPC server with Prometheus metrics")
grpc_prometheus.Register(grpcServer)
}

lis, err := net.Listen("tcp", gRPCAddress)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
defer lis.Close()
log.Info("Serving gRPC services at: ", lis.Addr())
go grpcServer.Serve(lis)
defer func() {
grpcServer.GracefulStop()
lis.Close()
log.Info("Stopped gRPC server")
}()
log.Info("Serving gRPC services at: ", lis.Addr())
}

//
// HTTP API
//
if opts.HTTPGateway || opts.Metrics {
grpcMux := grpcruntime.NewServeMux()
httpMux := http.NewServeMux()
Expand All @@ -199,28 +245,34 @@ func Run(ctx context.Context, opts *Options) error {
serveHTTPGateway(ctx, grpcMux, admin, wf, wfi)
}

// Metrics
if opts.Metrics {
setupMetricsEndpoint(httpMux)
log.Infof("Set up prometheus collector: %v/metrics", apiGatewayAddress)
}

apiSrv := &http.Server{Addr: apiGatewayAddress}
httpApiSrv := &http.Server{Addr: apiGatewayAddress}
httpMux.Handle("/", grpcMux)
apiSrv.Handler = httpMux
httpApiSrv.Handler = handlers.LoggingHandler(os.Stdout, httpMux)
go func() {
err := apiSrv.ListenAndServe()
log.WithField("err", err).Info("HTTP Gateway exited")
err := httpApiSrv.ListenAndServe()
log.WithField("err", err).Info("HTTP Gateway stopped")
}()
defer func() {
err := httpApiSrv.Shutdown(ctx)
if err != nil {
log.Errorf("Failed to stop HTTP API server: %v", err)
} else {
log.Info("Stopped HTTP API server")
}
}()
defer apiSrv.Shutdown(ctx)

log.Info("Serving HTTP API gateway at: ", apiSrv.Addr)
log.Info("Serving HTTP API gateway at: ", httpApiSrv.Addr)
}

log.Info("Bundle set up.")
log.Info("Setup completed.")
<-ctx.Done()
log.Info("Shutting down...")
// TODO properly shutdown components
log.WithField("reason", ctx.Err()).Info("Shutting down...")
time.Sleep(5 * time.Second)
return nil
}

Expand Down Expand Up @@ -391,11 +443,6 @@ func setupWorkflowController(wfCache fes.CacheReader, es fes.Backend,
return wfctr.NewController(wfCache, workflowAPI)
}

func runController(ctx context.Context, ctrls ...controller.Controller) {
ctrl := controller.NewMetaController(ctrls...)
go ctrl.Run(ctx)
}

func setupMetricsEndpoint(apiMux *http.ServeMux) {
apiMux.Handle("/metrics", promhttp.Handler())
}
2 changes: 1 addition & 1 deletion pkg/api/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, e
return nil, err
}

aggregate := aggregates.NewWorkflowInvocationAggregate(spec.InvocationId)
taskID := spec.TaskId // assumption: 1 task == 1 TaskInvocation (How to deal with retries? Same invocation?)
fn := &types.TaskInvocation{
Metadata: &types.ObjectMetadata{
Expand All @@ -58,6 +57,7 @@ func (ap *Task) Invoke(spec *types.TaskInvocationSpec) (*types.TaskInvocation, e
return nil, err
}

aggregate := aggregates.NewWorkflowInvocationAggregate(spec.InvocationId)
err = ap.es.Append(&fes.Event{
Type: events.Task_TASK_STARTED.String(),
Parent: aggregate,
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type ActionRemoveFromEvalCache struct {
}

func (a *ActionRemoveFromEvalCache) Apply() error {
log.Infof("Removing '%s' from EvalCache", a.ID)
a.EvalCache.Del(a.ID)
return nil
}
Expand Down
29 changes: 11 additions & 18 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ const (
)

var (
log = logrus.New().WithFields(logrus.Fields{"component": "controller"})
metaLog = log.WithField("controller", "controller-meta")
metaLog = logrus.New().WithFields(logrus.Fields{"component": "controller"})

// Controller-related metrics
EvalJobs = prometheus.NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -88,16 +87,14 @@ func NewMetaController(ctrls ...Controller) *MetaController {
}

func (mc *MetaController) Init(ctx context.Context) error {
metaLog.Info("Running MetaController init.")
for _, ctrl := range mc.ctrls {
err := ctrl.Init(ctx)
if err != nil {
return err
}
metaLog.Infof("'%s' controller init done.", reflect.TypeOf(ctrl))
metaLog.Debugf("Controller '%s' initialized.", reflect.TypeOf(ctrl))
}

metaLog.Info("Finished MetaController init.")
return nil
}

Expand Down Expand Up @@ -151,21 +148,17 @@ func (mc *MetaController) Notify(msg *fes.Notification) error {
}

func (mc *MetaController) Close() error {
metaLog.Info("Closing metacontroller and its controllers...")
var err error
var merr error
for _, ctrl := range mc.ctrls {
if closer, ok := ctrl.(io.Closer); ok {
err = closer.Close()
err := closer.Close()
if err != nil {
merr = err
metaLog.Errorf("Failed to stop controller %s: %v", reflect.TypeOf(ctrl), err)
} else {
metaLog.Debugf("Controller '%s' stopped.", reflect.TypeOf(ctrl))
}
}
}
metaLog.Info("Closed MetaController")
return err
}

func (mc *MetaController) Halt() {
mc.suspended = false
}

func (mc *MetaController) Resume() {
mc.suspended = true
return merr
}
Loading

0 comments on commit e8b5f22

Please sign in to comment.