diff --git a/cmd/tink-worker/cmd/root.go b/cmd/tink-worker/cmd/root.go index 24c289a6f..679b13f07 100644 --- a/cmd/tink-worker/cmd/root.go +++ b/cmd/tink-worker/cmd/root.go @@ -48,6 +48,7 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command { user, _ := cmd.Flags().GetString("registry-username") pwd, _ := cmd.Flags().GetString("registry-password") registry, _ := cmd.Flags().GetString("docker-registry") + captureActionLogs, _ := cmd.Flags().GetBool("capture-action-logs") logger.With("version", version).Info("starting") if setupErr := client.Setup(); setupErr != nil { @@ -70,7 +71,7 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command { regConn := internal.NewRegistryConnDetails(registry, user, pwd, logger) worker := internal.NewWorker(rClient, regConn, logger, registry, retries, retryInterval, maxFileSize) - err = worker.ProcessWorkflowActions(ctx, workerID) + err = worker.ProcessWorkflowActions(ctx, workerID, captureActionLogs) if err != nil { return errors.Wrap(err, "worker Finished with error") } @@ -86,6 +87,8 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command { rootCmd.Flags().Int64("max-file-size", defaultMaxFileSize, "Maximum file size in bytes (MAX_FILE_SIZE)") + rootCmd.Flags().Bool("capture-action-logs", true, "Capture action container output as part of worker logs") + // rootCmd.Flags().String("log-level", "info", "Sets the worker log level (panic, fatal, error, warn, info, debug, trace)") must := func(err error) { diff --git a/cmd/tink-worker/internal/action.go b/cmd/tink-worker/internal/action.go index 7bcdc6f46..4bcf9ed42 100644 --- a/cmd/tink-worker/internal/action.go +++ b/cmd/tink-worker/internal/action.go @@ -21,7 +21,7 @@ const ( infoWaitFinished = "wait finished for failed or timeout container" ) -func (w *Worker) createContainer(ctx context.Context, cmd []string, wfID string, action *pb.WorkflowAction) (string, error) { +func (w *Worker) createContainer(ctx context.Context, cmd []string, wfID string, action *pb.WorkflowAction, captureLogs bool) (string, error) { registry := w.registry config := &container.Config{ Image: path.Join(registry, action.GetImage()), @@ -31,6 +31,11 @@ func (w *Worker) createContainer(ctx context.Context, cmd []string, wfID string, Tty: true, Env: action.GetEnvironment(), } + if !captureLogs { + config.AttachStdout = true + config.AttachStderr = true + config.Tty = false + } wfDir := filepath.Join(dataDir, wfID) hostConfig := &container.HostConfig{ diff --git a/cmd/tink-worker/internal/worker.go b/cmd/tink-worker/internal/worker.go index 59efbfede..947540c74 100644 --- a/cmd/tink-worker/internal/worker.go +++ b/cmd/tink-worker/internal/worker.go @@ -94,14 +94,14 @@ func (w *Worker) captureLogs(ctx context.Context, id string) { } } -func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction) (pb.State, error) { +func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction, captureLogs bool) (pb.State, error) { l := w.logger.With("workflowID", wfID, "workerID", action.GetWorkerId(), "actionName", action.GetName(), "actionImage", action.GetImage()) cli := w.registryClient if err := w.regConn.pullImage(ctx, cli, action.GetImage()); err != nil { return pb.State_STATE_RUNNING, errors.Wrap(err, "DOCKER PULL") } - id, err := w.createContainer(ctx, action.Command, wfID, action) + id, err := w.createContainer(ctx, action.Command, wfID, action, captureLogs) if err != nil { return pb.State_STATE_RUNNING, errors.Wrap(err, "DOCKER CREATE") } @@ -124,8 +124,10 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc failedActionStatus := make(chan pb.State) - // capturing logs of action container in a go-routine - go w.captureLogs(ctx, id) + if captureLogs { + // capturing logs of action container in a go-routine + go w.captureLogs(ctx, id) + } status, waitErr := waitContainer(timeCtx, cli, id) defer func() { @@ -141,13 +143,15 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc l.With("status", status.String()).Info("container removed") if status != pb.State_STATE_SUCCESS { if status == pb.State_STATE_TIMEOUT && action.OnTimeout != nil { - id, err = w.createContainer(ctx, action.OnTimeout, wfID, action) + id, err = w.createContainer(ctx, action.OnTimeout, wfID, action, captureLogs) if err != nil { l.Error(errors.Wrap(err, errCreateContainer)) } l.With("containerID", id, "status", status.String(), "command", action.GetOnTimeout()).Info("container created") failedActionStatus := make(chan pb.State) - go w.captureLogs(ctx, id) + if captureLogs { + go w.captureLogs(ctx, id) + } go waitFailedContainer(ctx, l, cli, id, failedActionStatus) err = startContainer(ctx, l, cli, id) if err != nil { @@ -157,12 +161,14 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc l.With("status", onTimeoutStatus).Info("action timeout") } else { if action.OnFailure != nil { - id, err = w.createContainer(ctx, action.OnFailure, wfID, action) + id, err = w.createContainer(ctx, action.OnFailure, wfID, action, captureLogs) if err != nil { l.Error(errors.Wrap(err, errFailedToRunCmd)) } l.With("containerID", id, "actionStatus", status.String(), "command", action.GetOnFailure()).Info("container created") - go w.captureLogs(ctx, id) + if captureLogs { + go w.captureLogs(ctx, id) + } go waitFailedContainer(ctx, l, cli, id, failedActionStatus) err = startContainer(ctx, l, cli, id) if err != nil { @@ -182,7 +188,7 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc } // ProcessWorkflowActions gets all Workflow contexts and processes their actions -func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) error { +func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string, captureActionLogs bool) error { l := w.logger.With("workerID", workerID) for { @@ -292,7 +298,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) er // start executing the action start := time.Now() - status, err := w.execute(ctx, wfID, action) + status, err := w.execute(ctx, wfID, action, captureActionLogs) elapsed := time.Since(start) actionStatus := &pb.WorkflowActionStatus{