Skip to content

Commit

Permalink
Add flag to allow you to disable tink-worker action logging rollup (#443
Browse files Browse the repository at this point in the history
)

## Description

Currently tink-worker attaches to the 'console' of each action container and logs its STDOUT/STDERR alongside tink-worker's normal output.    This change allows this behavior to be disabled through a flag.

## Why is this needed

We wish to log all action output centrally via syslog as configured in the docker 'daemon.json'.    If we enable that, and tink-worker still wants to grab the action logs the conflict will cause actions to fail to execute.
  • Loading branch information
mergify[bot] authored Mar 3, 2021
2 parents db0edc3 + f354b54 commit 5e1f0fd
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 12 deletions.
5 changes: 4 additions & 1 deletion cmd/tink-worker/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command {
user, _ := cmd.Flags().GetString("registry-username")
pwd, _ := cmd.Flags().GetString("registry-password")
registry, _ := cmd.Flags().GetString("docker-registry")
captureActionLogs, _ := cmd.Flags().GetBool("capture-action-logs")

logger.With("version", version).Info("starting")
if setupErr := client.Setup(); setupErr != nil {
Expand All @@ -70,7 +71,7 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command {
regConn := internal.NewRegistryConnDetails(registry, user, pwd, logger)
worker := internal.NewWorker(rClient, regConn, logger, registry, retries, retryInterval, maxFileSize)

err = worker.ProcessWorkflowActions(ctx, workerID)
err = worker.ProcessWorkflowActions(ctx, workerID, captureActionLogs)
if err != nil {
return errors.Wrap(err, "worker Finished with error")
}
Expand All @@ -86,6 +87,8 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command {

rootCmd.Flags().Int64("max-file-size", defaultMaxFileSize, "Maximum file size in bytes (MAX_FILE_SIZE)")

rootCmd.Flags().Bool("capture-action-logs", true, "Capture action container output as part of worker logs")

// rootCmd.Flags().String("log-level", "info", "Sets the worker log level (panic, fatal, error, warn, info, debug, trace)")

must := func(err error) {
Expand Down
7 changes: 6 additions & 1 deletion cmd/tink-worker/internal/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
infoWaitFinished = "wait finished for failed or timeout container"
)

func (w *Worker) createContainer(ctx context.Context, cmd []string, wfID string, action *pb.WorkflowAction) (string, error) {
func (w *Worker) createContainer(ctx context.Context, cmd []string, wfID string, action *pb.WorkflowAction, captureLogs bool) (string, error) {
registry := w.registry
config := &container.Config{
Image: path.Join(registry, action.GetImage()),
Expand All @@ -31,6 +31,11 @@ func (w *Worker) createContainer(ctx context.Context, cmd []string, wfID string,
Tty: true,
Env: action.GetEnvironment(),
}
if !captureLogs {
config.AttachStdout = true
config.AttachStderr = true
config.Tty = false
}

wfDir := filepath.Join(dataDir, wfID)
hostConfig := &container.HostConfig{
Expand Down
26 changes: 16 additions & 10 deletions cmd/tink-worker/internal/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,14 @@ func (w *Worker) captureLogs(ctx context.Context, id string) {
}
}

func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction) (pb.State, error) {
func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction, captureLogs bool) (pb.State, error) {
l := w.logger.With("workflowID", wfID, "workerID", action.GetWorkerId(), "actionName", action.GetName(), "actionImage", action.GetImage())

cli := w.registryClient
if err := w.regConn.pullImage(ctx, cli, action.GetImage()); err != nil {
return pb.State_STATE_RUNNING, errors.Wrap(err, "DOCKER PULL")
}
id, err := w.createContainer(ctx, action.Command, wfID, action)
id, err := w.createContainer(ctx, action.Command, wfID, action, captureLogs)
if err != nil {
return pb.State_STATE_RUNNING, errors.Wrap(err, "DOCKER CREATE")
}
Expand All @@ -124,8 +124,10 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc

failedActionStatus := make(chan pb.State)

// capturing logs of action container in a go-routine
go w.captureLogs(ctx, id)
if captureLogs {
// capturing logs of action container in a go-routine
go w.captureLogs(ctx, id)
}

status, waitErr := waitContainer(timeCtx, cli, id)
defer func() {
Expand All @@ -141,13 +143,15 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc
l.With("status", status.String()).Info("container removed")
if status != pb.State_STATE_SUCCESS {
if status == pb.State_STATE_TIMEOUT && action.OnTimeout != nil {
id, err = w.createContainer(ctx, action.OnTimeout, wfID, action)
id, err = w.createContainer(ctx, action.OnTimeout, wfID, action, captureLogs)
if err != nil {
l.Error(errors.Wrap(err, errCreateContainer))
}
l.With("containerID", id, "status", status.String(), "command", action.GetOnTimeout()).Info("container created")
failedActionStatus := make(chan pb.State)
go w.captureLogs(ctx, id)
if captureLogs {
go w.captureLogs(ctx, id)
}
go waitFailedContainer(ctx, l, cli, id, failedActionStatus)
err = startContainer(ctx, l, cli, id)
if err != nil {
Expand All @@ -157,12 +161,14 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc
l.With("status", onTimeoutStatus).Info("action timeout")
} else {
if action.OnFailure != nil {
id, err = w.createContainer(ctx, action.OnFailure, wfID, action)
id, err = w.createContainer(ctx, action.OnFailure, wfID, action, captureLogs)
if err != nil {
l.Error(errors.Wrap(err, errFailedToRunCmd))
}
l.With("containerID", id, "actionStatus", status.String(), "command", action.GetOnFailure()).Info("container created")
go w.captureLogs(ctx, id)
if captureLogs {
go w.captureLogs(ctx, id)
}
go waitFailedContainer(ctx, l, cli, id, failedActionStatus)
err = startContainer(ctx, l, cli, id)
if err != nil {
Expand All @@ -182,7 +188,7 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc
}

// ProcessWorkflowActions gets all Workflow contexts and processes their actions
func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) error {
func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string, captureActionLogs bool) error {
l := w.logger.With("workerID", workerID)

for {
Expand Down Expand Up @@ -292,7 +298,7 @@ func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) er

// start executing the action
start := time.Now()
status, err := w.execute(ctx, wfID, action)
status, err := w.execute(ctx, wfID, action, captureActionLogs)
elapsed := time.Since(start)

actionStatus := &pb.WorkflowActionStatus{
Expand Down

0 comments on commit 5e1f0fd

Please sign in to comment.