From 5f1695718ef8cb71c798e5db81c4dd792f7e8e26 Mon Sep 17 00:00:00 2001 From: "James W. Brinkerhoff" Date: Thu, 25 Feb 2021 10:45:06 -0500 Subject: [PATCH 1/3] Add flag to allow you to disable tink-worker action logging rollup Signed-off-by: James W. Brinkerhoff --- cmd/tink-worker/cmd/root.go | 5 ++++- cmd/tink-worker/internal/worker.go | 12 +++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/cmd/tink-worker/cmd/root.go b/cmd/tink-worker/cmd/root.go index 24c289a6f..679b13f07 100644 --- a/cmd/tink-worker/cmd/root.go +++ b/cmd/tink-worker/cmd/root.go @@ -48,6 +48,7 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command { user, _ := cmd.Flags().GetString("registry-username") pwd, _ := cmd.Flags().GetString("registry-password") registry, _ := cmd.Flags().GetString("docker-registry") + captureActionLogs, _ := cmd.Flags().GetBool("capture-action-logs") logger.With("version", version).Info("starting") if setupErr := client.Setup(); setupErr != nil { @@ -70,7 +71,7 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command { regConn := internal.NewRegistryConnDetails(registry, user, pwd, logger) worker := internal.NewWorker(rClient, regConn, logger, registry, retries, retryInterval, maxFileSize) - err = worker.ProcessWorkflowActions(ctx, workerID) + err = worker.ProcessWorkflowActions(ctx, workerID, captureActionLogs) if err != nil { return errors.Wrap(err, "worker Finished with error") } @@ -86,6 +87,8 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command { rootCmd.Flags().Int64("max-file-size", defaultMaxFileSize, "Maximum file size in bytes (MAX_FILE_SIZE)") + rootCmd.Flags().Bool("capture-action-logs", true, "Capture action container output as part of worker logs") + // rootCmd.Flags().String("log-level", "info", "Sets the worker log level (panic, fatal, error, warn, info, debug, trace)") must := func(err error) { diff --git a/cmd/tink-worker/internal/worker.go b/cmd/tink-worker/internal/worker.go index 59efbfede..869a197bd 100644 --- a/cmd/tink-worker/internal/worker.go +++ b/cmd/tink-worker/internal/worker.go @@ -94,7 +94,7 @@ func (w *Worker) captureLogs(ctx context.Context, id string) { } } -func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction) (pb.State, error) { +func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction, captureLogs bool) (pb.State, error) { l := w.logger.With("workflowID", wfID, "workerID", action.GetWorkerId(), "actionName", action.GetName(), "actionImage", action.GetImage()) cli := w.registryClient @@ -124,8 +124,10 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc failedActionStatus := make(chan pb.State) - // capturing logs of action container in a go-routine - go w.captureLogs(ctx, id) + if captureLogs { + // capturing logs of action container in a go-routine + go w.captureLogs(ctx, id) + } status, waitErr := waitContainer(timeCtx, cli, id) defer func() { @@ -182,7 +184,7 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc } // ProcessWorkflowActions gets all Workflow contexts and processes their actions -func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) error { +func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string, captureActionLogs bool) error { l := w.logger.With("workerID", workerID) for { @@ -292,7 +294,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) er // start executing the action start := time.Now() - status, err := w.execute(ctx, wfID, action) + status, err := w.execute(ctx, wfID, action, captureActionLogs) elapsed := time.Since(start) actionStatus := &pb.WorkflowActionStatus{ From 8f3432ef6382b7a2740508efeba5663e598e52f9 Mon Sep 17 00:00:00 2001 From: "James W. Brinkerhoff" Date: Thu, 25 Feb 2021 13:48:35 -0500 Subject: [PATCH 2/3] disable tty handling when creating container Signed-off-by: James W. Brinkerhoff --- cmd/tink-worker/internal/action.go | 7 ++++++- cmd/tink-worker/internal/worker.go | 8 +++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/cmd/tink-worker/internal/action.go b/cmd/tink-worker/internal/action.go index 7bcdc6f46..4bcf9ed42 100644 --- a/cmd/tink-worker/internal/action.go +++ b/cmd/tink-worker/internal/action.go @@ -21,7 +21,7 @@ const ( infoWaitFinished = "wait finished for failed or timeout container" ) -func (w *Worker) createContainer(ctx context.Context, cmd []string, wfID string, action *pb.WorkflowAction) (string, error) { +func (w *Worker) createContainer(ctx context.Context, cmd []string, wfID string, action *pb.WorkflowAction, captureLogs bool) (string, error) { registry := w.registry config := &container.Config{ Image: path.Join(registry, action.GetImage()), @@ -31,6 +31,11 @@ func (w *Worker) createContainer(ctx context.Context, cmd []string, wfID string, Tty: true, Env: action.GetEnvironment(), } + if !captureLogs { + config.AttachStdout = true + config.AttachStderr = true + config.Tty = false + } wfDir := filepath.Join(dataDir, wfID) hostConfig := &container.HostConfig{ diff --git a/cmd/tink-worker/internal/worker.go b/cmd/tink-worker/internal/worker.go index 869a197bd..4afaa7776 100644 --- a/cmd/tink-worker/internal/worker.go +++ b/cmd/tink-worker/internal/worker.go @@ -101,7 +101,7 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc if err := w.regConn.pullImage(ctx, cli, action.GetImage()); err != nil { return pb.State_STATE_RUNNING, errors.Wrap(err, "DOCKER PULL") } - id, err := w.createContainer(ctx, action.Command, wfID, action) + id, err := w.createContainer(ctx, action.Command, wfID, action, captureLogs) if err != nil { return pb.State_STATE_RUNNING, errors.Wrap(err, "DOCKER CREATE") } @@ -143,12 +143,13 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc l.With("status", status.String()).Info("container removed") if status != pb.State_STATE_SUCCESS { if status == pb.State_STATE_TIMEOUT && action.OnTimeout != nil { - id, err = w.createContainer(ctx, action.OnTimeout, wfID, action) + id, err = w.createContainer(ctx, action.OnTimeout, wfID, action, captureLogs) if err != nil { l.Error(errors.Wrap(err, errCreateContainer)) } l.With("containerID", id, "status", status.String(), "command", action.GetOnTimeout()).Info("container created") failedActionStatus := make(chan pb.State) + // TODD(jwb) Need to handle this capture logs as well go w.captureLogs(ctx, id) go waitFailedContainer(ctx, l, cli, id, failedActionStatus) err = startContainer(ctx, l, cli, id) @@ -159,11 +160,12 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc l.With("status", onTimeoutStatus).Info("action timeout") } else { if action.OnFailure != nil { - id, err = w.createContainer(ctx, action.OnFailure, wfID, action) + id, err = w.createContainer(ctx, action.OnFailure, wfID, action, captureLogs) if err != nil { l.Error(errors.Wrap(err, errFailedToRunCmd)) } l.With("containerID", id, "actionStatus", status.String(), "command", action.GetOnFailure()).Info("container created") + // TODO(jwb) - Handle logging here too go w.captureLogs(ctx, id) go waitFailedContainer(ctx, l, cli, id, failedActionStatus) err = startContainer(ctx, l, cli, id) From f354b54951d76a5cdad6ed78ec849514c91ba4bf Mon Sep 17 00:00:00 2001 From: "James W. Brinkerhoff" Date: Wed, 3 Mar 2021 09:11:16 -0500 Subject: [PATCH 3/3] Wrap additional calls to capture logs Signed-off-by: James W. Brinkerhoff --- cmd/tink-worker/internal/worker.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/tink-worker/internal/worker.go b/cmd/tink-worker/internal/worker.go index 4afaa7776..947540c74 100644 --- a/cmd/tink-worker/internal/worker.go +++ b/cmd/tink-worker/internal/worker.go @@ -149,8 +149,9 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc } l.With("containerID", id, "status", status.String(), "command", action.GetOnTimeout()).Info("container created") failedActionStatus := make(chan pb.State) - // TODD(jwb) Need to handle this capture logs as well - go w.captureLogs(ctx, id) + if captureLogs { + go w.captureLogs(ctx, id) + } go waitFailedContainer(ctx, l, cli, id, failedActionStatus) err = startContainer(ctx, l, cli, id) if err != nil { @@ -165,8 +166,9 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc l.Error(errors.Wrap(err, errFailedToRunCmd)) } l.With("containerID", id, "actionStatus", status.String(), "command", action.GetOnFailure()).Info("container created") - // TODO(jwb) - Handle logging here too - go w.captureLogs(ctx, id) + if captureLogs { + go w.captureLogs(ctx, id) + } go waitFailedContainer(ctx, l, cli, id, failedActionStatus) err = startContainer(ctx, l, cli, id) if err != nil {