From c5db740adbd10acdec0b7e2f4e5d30b52e759b17 Mon Sep 17 00:00:00 2001 From: Marques Johansson Date: Tue, 22 Sep 2020 23:54:22 -0400 Subject: [PATCH 1/4] worker: (broken) move action and worker to internal/ for cleaner git history Signed-off-by: Marques Johansson --- cmd/tink-worker/{ => internal}/action.go | 2 +- cmd/tink-worker/{ => internal}/worker.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename cmd/tink-worker/{ => internal}/action.go (99%) rename cmd/tink-worker/{ => internal}/worker.go (99%) diff --git a/cmd/tink-worker/action.go b/cmd/tink-worker/internal/action.go similarity index 99% rename from cmd/tink-worker/action.go rename to cmd/tink-worker/internal/action.go index f06ff2f26..1378b3aca 100644 --- a/cmd/tink-worker/action.go +++ b/cmd/tink-worker/internal/action.go @@ -1,4 +1,4 @@ -package main +package internal import ( "bufio" diff --git a/cmd/tink-worker/worker.go b/cmd/tink-worker/internal/worker.go similarity index 99% rename from cmd/tink-worker/worker.go rename to cmd/tink-worker/internal/worker.go index c4c88d0fb..9fa4d19f6 100644 --- a/cmd/tink-worker/worker.go +++ b/cmd/tink-worker/internal/worker.go @@ -1,4 +1,4 @@ -package main +package internal import ( "context" From 01bc0c5b92e64eca97712f1e3eadce7dafcfdbd6 Mon Sep 17 00:00:00 2001 From: Marques Johansson Date: Sat, 5 Sep 2020 23:01:54 -0400 Subject: [PATCH 2/4] tink worker command line args Signed-off-by: Marques Johansson --- cmd/tink-worker/cmd/root.go | 173 +++++++++++++++++++++ cmd/tink-worker/internal/action.go | 211 +++----------------------- cmd/tink-worker/internal/registry.go | 71 +++++++++ cmd/tink-worker/internal/worker.go | 218 +++++++++++++++++++++------ cmd/tink-worker/main.go | 86 +---------- go.mod | 4 + http-server/http_handlers.go | 6 + 7 files changed, 451 insertions(+), 318 deletions(-) create mode 100644 cmd/tink-worker/cmd/root.go create mode 100644 cmd/tink-worker/internal/registry.go diff --git a/cmd/tink-worker/cmd/root.go b/cmd/tink-worker/cmd/root.go new file mode 100644 index 000000000..39ec6508f --- /dev/null +++ b/cmd/tink-worker/cmd/root.go @@ -0,0 +1,173 @@ +package cmd + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + "github.com/packethost/pkg/log" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "github.com/spf13/viper" + "github.com/tinkerbell/tink/client" + "github.com/tinkerbell/tink/cmd/tink-worker/internal" + pb "github.com/tinkerbell/tink/protos/workflow" + "google.golang.org/grpc" +) + +const ( + retryIntervalDefault = 3 + retryCountDefault = 3 + defaultMaxFileSize int64 = 10485760 //10MB ~= 10485760Bytes + defaultTimeoutMinutes = 60 +) + +// NewRootCommand creates a new Tink Worker Cobra root command +func NewRootCommand(version string, logger log.Logger) *cobra.Command { + must := func(err error) { + if err != nil { + logger.Fatal(err) + } + } + + rootCmd := &cobra.Command{ + Use: "tink-worker", + Short: "Tink Worker", + Version: version, + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + viper, err := createViper() + if err != nil { + return err + } + return applyViper(viper, cmd) + }, + RunE: func(cmd *cobra.Command, args []string) error { + retryInterval, _ := cmd.PersistentFlags().GetDuration("retry-interval") + retries, _ := cmd.PersistentFlags().GetInt("retries") + // TODO(displague) is log-level no longer useful? + // logLevel, _ := cmd.PersistentFlags().GetString("log-level") + workerID, _ := cmd.PersistentFlags().GetString("id") + maxFileSize, _ := cmd.PersistentFlags().GetInt64("max-file-size") + timeOut, _ := cmd.PersistentFlags().GetDuration("timeout") + user, _ := cmd.PersistentFlags().GetString("registry-username") + pwd, _ := cmd.PersistentFlags().GetString("registry-password") + registry, _ := cmd.PersistentFlags().GetString("docker-registry") + + logger.With("version", version).Info("starting") + if setupErr := client.Setup(); setupErr != nil { + return setupErr + } + + ctx := context.Background() + if timeOut > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, timeOut) + defer cancel() + } + + conn, err := tryClientConnection(logger, retryInterval, retries) + if err != nil { + return err + } + rClient := pb.NewWorkflowSvcClient(conn) + + regConn := internal.NewRegistryConnDetails(registry, user, pwd, logger) + worker := internal.NewWorker(rClient, regConn, logger, registry, retries, retryInterval, maxFileSize) + + err = worker.ProcessWorkflowActions(ctx, workerID) + if err != nil { + return errors.Wrap(err, "worker Finished with error") + } + return nil + }, + } + + rootCmd.PersistentFlags().Duration("retry-interval", retryIntervalDefault, "Retry interval in seconds") + + rootCmd.PersistentFlags().Duration("timeout", time.Duration(defaultTimeoutMinutes*time.Minute), "Max duration to wait for worker to complete") + + rootCmd.PersistentFlags().Int("max-retry", retryCountDefault, "Maximum number of retries to attempt") + + rootCmd.PersistentFlags().Int64("max-file-size", defaultMaxFileSize, "Maximum file size in bytes") + + // rootCmd.PersistentFlags().String("log-level", "info", "Sets the worker log level (panic, fatal, error, warn, info, debug, trace)") + + rootCmd.PersistentFlags().StringP("id", "i", "", "Sets the worker id") + must(rootCmd.MarkPersistentFlagRequired("id")) + + rootCmd.PersistentFlags().StringP("docker-registry", "r", "", "Sets the Docker registry") + must(rootCmd.MarkPersistentFlagRequired("docker-registry")) + + rootCmd.PersistentFlags().StringP("registry-username", "u", "", "Sets the registry username") + must(rootCmd.MarkPersistentFlagRequired("registry-username")) + + rootCmd.PersistentFlags().StringP("registry-password", "p", "", "Sets the registry-password") + must(rootCmd.MarkPersistentFlagRequired("registry-password")) + + return rootCmd +} + +// createViper creates a Viper object configured to read in configuration files +// (from various paths with content type specific filename extensions) and load +// environment variables that start with TINK_WORKER. +func createViper() (*viper.Viper, error) { + v := viper.New() + v.AutomaticEnv() + v.SetConfigName("tink-worker") + v.AddConfigPath("/etc/tinkerbell") + v.AddConfigPath(".") + v.SetEnvPrefix("TINK_WORKER") + v.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) + + // If a config file is found, read it in. + if err := v.ReadInConfig(); err != nil { + if _, ok := err.(viper.ConfigFileNotFoundError); !ok { + return nil, err + } + } else { + fmt.Fprintln(os.Stderr, "Using config file:", v.ConfigFileUsed()) + } + + return v, nil +} + +func applyViper(v *viper.Viper, cmd *cobra.Command) error { + errors := []error{} + + cmd.PersistentFlags().VisitAll(func(f *pflag.Flag) { + if !f.Changed && v.IsSet(f.Name) { + val := v.Get(f.Name) + if err := cmd.Flags().Set(f.Name, fmt.Sprintf("%v", val)); err != nil { + errors = append(errors, err) + return + } + } + }) + + if len(errors) > 0 { + errs := []string{} + for _, err := range errors { + errs = append(errs, err.Error()) + } + return fmt.Errorf(strings.Join(errs, ", ")) + } + + return nil +} + +func tryClientConnection(logger log.Logger, retryInterval time.Duration, retries int) (*grpc.ClientConn, error) { + for ; retries > 0; retries-- { + c, err := client.GetConnection() + if err != nil { + logger.With("error", err, "duration", retryInterval).Info("failed to connect, sleeping before retrying") + <-time.After(retryInterval * time.Second) + continue + } + + return c, nil + } + return nil, fmt.Errorf("retries exceeded") +} diff --git a/cmd/tink-worker/internal/action.go b/cmd/tink-worker/internal/action.go index 1378b3aca..b9815b645 100644 --- a/cmd/tink-worker/internal/action.go +++ b/cmd/tink-worker/internal/action.go @@ -1,14 +1,9 @@ package internal import ( - "bufio" "context" - "encoding/base64" - "encoding/json" - "fmt" - "io" - "os" - "time" + "path" + "path/filepath" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" @@ -18,199 +13,48 @@ import ( pb "github.com/tinkerbell/tink/protos/workflow" ) -var ( - registry string - cli *client.Client -) - const ( + errContextClosed = "failed to wait for container, context closed" errCreateContainer = "failed to create container" - errRemoveContainer = "failed to remove container" errFailedToWait = "failed to wait for completion of action" errFailedToRunCmd = "failed to run on-timeout command" infoWaitFinished = "wait finished for failed or timeout container" ) -func executeAction(ctx context.Context, action *pb.WorkflowAction, wfID string) (pb.ActionState, error) { - l := logger.With("workflowID", wfID, "workerID", action.GetWorkerId(), "actionName", action.GetName(), "actionImage", action.GetImage()) - err := pullActionImage(ctx, action) - if err != nil { - return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER PULL") - } - id, err := createContainer(ctx, l, action, action.Command, wfID) - if err != nil { - return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER CREATE") - } - l.With("containerID", id, "command", action.GetOnTimeout()).Info("container created") - // Setting time context for action - timeCtx := ctx - if action.Timeout > 0 { - var cancel context.CancelFunc - timeCtx, cancel = context.WithTimeout(ctx, time.Duration(action.Timeout)*time.Second) - defer cancel() - } - err = startContainer(timeCtx, l, id) - if err != nil { - return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER RUN") - } - - failedActionStatus := make(chan pb.ActionState) - - //capturing logs of action container in a go-routine - go captureLogs(ctx, id) - - status, err := waitContainer(timeCtx, id) - if err != nil { - rerr := removeContainer(ctx, l, id) - if rerr != nil { - rerr = errors.Wrap(rerr, errRemoveContainer) - l.With("containerID", id).Error(rerr) - return status, rerr - } - return status, errors.Wrap(err, "DOCKER_WAIT") - } - rerr := removeContainer(ctx, l, id) - if rerr != nil { - return status, errors.Wrap(rerr, "DOCKER_REMOVE") - } - l.With("status", status.String()).Info("container removed") - if status != pb.ActionState_ACTION_SUCCESS { - if status == pb.ActionState_ACTION_TIMEOUT && action.OnTimeout != nil { - id, err = createContainer(ctx, l, action, action.OnTimeout, wfID) - if err != nil { - l.Error(errors.Wrap(err, errCreateContainer)) - } - l.With("containerID", id, "status", status.String(), "command", action.GetOnTimeout()).Info("container created") - failedActionStatus := make(chan pb.ActionState) - go captureLogs(ctx, id) - go waitFailedContainer(ctx, id, failedActionStatus) - err = startContainer(ctx, l, id) - if err != nil { - l.Error(errors.Wrap(err, errFailedToRunCmd)) - } - onTimeoutStatus := <-failedActionStatus - l.With("status", onTimeoutStatus).Info("action timeout") - } else { - if action.OnFailure != nil { - id, err = createContainer(ctx, l, action, action.OnFailure, wfID) - if err != nil { - l.Error(errors.Wrap(err, errFailedToRunCmd)) - } - l.With("containerID", id, "actionStatus", status.String(), "command", action.GetOnFailure()).Info("container created") - go captureLogs(ctx, id) - go waitFailedContainer(ctx, id, failedActionStatus) - err = startContainer(ctx, l, id) - if err != nil { - l.Error(errors.Wrap(err, errFailedToRunCmd)) - } - onFailureStatus := <-failedActionStatus - l.With("status", onFailureStatus).Info("action failed") - } - } - l.Info(infoWaitFinished) - if err != nil { - rerr := removeContainer(ctx, l, id) - if rerr != nil { - l.Error(errors.Wrap(rerr, errRemoveContainer)) - } - l.Error(errors.Wrap(err, errFailedToWait)) - } - rerr = removeContainer(ctx, l, id) - if rerr != nil { - l.Error(errors.Wrap(rerr, errRemoveContainer)) - } - } - l.With("status", status).Info("action container exited") - return status, nil -} - -func captureLogs(ctx context.Context, id string) { - reader, err := cli.ContainerLogs(context.Background(), id, types.ContainerLogsOptions{ - ShowStdout: true, - ShowStderr: true, - Follow: true, - Timestamps: false, - }) - if err != nil { - panic(err) - } - defer reader.Close() - - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - fmt.Println(scanner.Text()) - } -} - -func pullActionImage(ctx context.Context, action *pb.WorkflowAction) error { - user := os.Getenv("REGISTRY_USERNAME") - pwd := os.Getenv("REGISTRY_PASSWORD") - if user == "" || pwd == "" { - return errors.New("required REGISTRY_USERNAME and REGISTRY_PASSWORD") - } - - authConfig := types.AuthConfig{ - Username: user, - Password: pwd, - ServerAddress: registry, - } - encodedJSON, err := json.Marshal(authConfig) - if err != nil { - return errors.Wrap(err, "DOCKER AUTH") - } - authStr := base64.URLEncoding.EncodeToString(encodedJSON) - - out, err := cli.ImagePull(ctx, registry+"/"+action.GetImage(), types.ImagePullOptions{RegistryAuth: authStr}) - if err != nil { - return errors.Wrap(err, "DOCKER PULL") - } - defer out.Close() - if _, err := io.Copy(os.Stdout, out); err != nil { - return err - } - return nil -} - -func createContainer(ctx context.Context, l log.Logger, action *pb.WorkflowAction, cmd []string, wfID string) (string, error) { +func (w *Worker) createContainer(ctx context.Context, cmd []string, wfID string, action *pb.WorkflowAction) (string, error) { + registry := w.registry config := &container.Config{ - Image: registry + "/" + action.GetImage(), + Image: path.Join(registry, action.GetImage()), AttachStdout: true, AttachStderr: true, + Cmd: cmd, Tty: true, Env: action.GetEnvironment(), } - if cmd != nil { - config.Cmd = cmd - } - wfDir := dataDir + string(os.PathSeparator) + wfID + wfDir := filepath.Join(dataDir, wfID) hostConfig := &container.HostConfig{ Privileged: true, Binds: []string{wfDir + ":/workflow"}, } hostConfig.Binds = append(hostConfig.Binds, action.GetVolumes()...) - l.With("command", cmd).Info("creating container") - resp, err := cli.ContainerCreate(ctx, config, hostConfig, nil, action.GetName()) + w.logger.With("command", cmd).Info("creating container") + resp, err := w.registryClient.ContainerCreate(ctx, config, hostConfig, nil, action.GetName()) if err != nil { return "", errors.Wrap(err, "DOCKER CREATE") } return resp.ID, nil } -func startContainer(ctx context.Context, l log.Logger, id string) error { +func startContainer(ctx context.Context, l log.Logger, cli *client.Client, id string) error { l.With("containerID", id).Debug("starting container") - err := cli.ContainerStart(ctx, id, types.ContainerStartOptions{}) - if err != nil { - return errors.Wrap(err, "DOCKER START") - } - return nil + return errors.Wrap(cli.ContainerStart(ctx, id, types.ContainerStartOptions{}), "DOCKER START") } -func waitContainer(ctx context.Context, id string) (pb.ActionState, error) { +func waitContainer(ctx context.Context, cli *client.Client, id string) (pb.ActionState, error) { // Inspect whether the container is in running state - _, err := cli.ContainerInspect(ctx, id) - if err != nil { + if _, err := cli.ContainerInspect(ctx, id); err != nil { return pb.ActionState_ACTION_FAILED, nil } @@ -230,7 +74,7 @@ func waitContainer(ctx context.Context, id string) (pb.ActionState, error) { } } -func waitFailedContainer(ctx context.Context, id string, failedActionStatus chan pb.ActionState) { +func waitFailedContainer(ctx context.Context, l log.Logger, cli *client.Client, id string, failedActionStatus chan pb.ActionState) { // send API call to wait for the container completion wait, errC := cli.ContainerWait(ctx, id, container.WaitConditionNotRunning) @@ -241,12 +85,15 @@ func waitFailedContainer(ctx context.Context, id string, failedActionStatus chan } failedActionStatus <- pb.ActionState_ACTION_FAILED case err := <-errC: - logger.Error(err) + l.Error(err) failedActionStatus <- pb.ActionState_ACTION_FAILED + case <-ctx.Done(): + l.Error(ctx.Err()) + failedActionStatus <- pb.ActionState_ACTION_TIMEOUT } } -func removeContainer(ctx context.Context, l log.Logger, id string) error { +func removeContainer(ctx context.Context, l log.Logger, cli *client.Client, id string) error { // create options for removing container opts := types.ContainerRemoveOptions{ Force: true, @@ -256,21 +103,5 @@ func removeContainer(ctx context.Context, l log.Logger, id string) error { l.With("containerID", id).Info("removing container") // send API call to remove the container - err := cli.ContainerRemove(ctx, id, opts) - if err != nil { - return err - } - return nil -} - -func initializeDockerClient() (*client.Client, error) { - registry = os.Getenv("DOCKER_REGISTRY") - if registry == "" { - return nil, errors.New("required DOCKER_REGISTRY") - } - c, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) - if err != nil { - return nil, errors.Wrap(err, "DOCKER CLIENT") - } - return c, nil + return cli.ContainerRemove(ctx, id, opts) } diff --git a/cmd/tink-worker/internal/registry.go b/cmd/tink-worker/internal/registry.go new file mode 100644 index 000000000..a20f12e32 --- /dev/null +++ b/cmd/tink-worker/internal/registry.go @@ -0,0 +1,71 @@ +package internal + +import ( + "context" + "encoding/base64" + "encoding/json" + "io" + "os" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/client" + "github.com/packethost/pkg/log" + "github.com/pkg/errors" +) + +// RegistryConnDetails are the connection details for accessing a Docker +// registry and logging activities +type RegistryConnDetails struct { + registry, + user, + pwd string + logger log.Logger +} + +// NewRegistryConnDetails creates a new RegistryConnDetails +func NewRegistryConnDetails(registry, user, pwd string, logger log.Logger) *RegistryConnDetails { + return &RegistryConnDetails{ + registry: registry, + user: user, + pwd: pwd, + logger: logger, + } +} + +// NewClient uses the RegistryConnDetails to create a new Docker Client +func (r *RegistryConnDetails) NewClient() (*client.Client, error) { + if r.registry == "" { + return nil, errors.New("required DOCKER_REGISTRY") + } + c, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + + if err != nil { + return nil, errors.Wrap(err, "DOCKER CLIENT") + } + + return c, nil +} + +// pullImage outputs to stdout the contents of the requested image (relative to the registry) +func (r *RegistryConnDetails) pullImage(ctx context.Context, cli *client.Client, image string) error { + authConfig := types.AuthConfig{ + Username: r.user, + Password: r.pwd, + ServerAddress: r.registry, + } + encodedJSON, err := json.Marshal(authConfig) + if err != nil { + return errors.Wrap(err, "DOCKER AUTH") + } + authStr := base64.URLEncoding.EncodeToString(encodedJSON) + + out, err := cli.ImagePull(ctx, r.registry+"/"+image, types.ImagePullOptions{RegistryAuth: authStr}) + if err != nil { + return errors.Wrap(err, "DOCKER PULL") + } + defer out.Close() + if _, err := io.Copy(os.Stdout, out); err != nil { + return err + } + return nil +} diff --git a/cmd/tink-worker/internal/worker.go b/cmd/tink-worker/internal/worker.go index 9fa4d19f6..aa2918b7f 100644 --- a/cmd/tink-worker/internal/worker.go +++ b/cmd/tink-worker/internal/worker.go @@ -1,6 +1,7 @@ package internal import ( + "bufio" "context" sha "crypto/sha256" "encoding/base64" @@ -12,6 +13,8 @@ import ( "strings" "time" + "github.com/docker/docker/api/types" + "github.com/docker/docker/client" "github.com/packethost/pkg/log" "github.com/pkg/errors" pb "github.com/tinkerbell/tink/protos/workflow" @@ -19,10 +22,8 @@ import ( ) const ( - dataFile = "data" - dataDir = "/worker" - maxFileSize = "MAX_FILE_SIZE" // in bytes - defaultMaxFileSize int64 = 10485760 //10MB ~= 10485760Bytes + dataFile = "data" + dataDir = "/worker" errGetWfContext = "failed to get workflow context" errGetWfActions = "failed to get actions for workflow" @@ -45,28 +46,155 @@ type WorkflowMetadata struct { SHA string `json:"sha256"` } -func processWorkflowActions(client pb.WorkflowSvcClient) error { - workerID := os.Getenv("WORKER_ID") - if workerID == "" { - return errors.New("required WORKER_ID") +// Worker details provide all the context needed to run a +type Worker struct { + client pb.WorkflowSvcClient + regConn *RegistryConnDetails + registryClient *client.Client + logger log.Logger + registry string + retries int + retryInterval time.Duration + maxSize int64 +} + +// NewWorker creates a new Worker, creating a new Docker registry client +func NewWorker(client pb.WorkflowSvcClient, regConn *RegistryConnDetails, logger log.Logger, registry string, retries int, retryInterval time.Duration, maxFileSize int64) *Worker { + registryClient, err := regConn.NewClient() + if err != nil { + panic(err) + } + return &Worker{ + client: client, + regConn: regConn, + registryClient: registryClient, + logger: logger, + registry: registry, + retries: retries, + retryInterval: retryInterval, + maxSize: maxFileSize, } +} - ctx := context.Background() - var err error - cli, err = initializeDockerClient() +func (w *Worker) captureLogs(ctx context.Context, id string) { + reader, err := w.registryClient.ContainerLogs(ctx, id, types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Follow: true, + Timestamps: false, + }) + if err != nil { + panic(err) + } + defer reader.Close() + + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + fmt.Println(scanner.Text()) + } +} + +func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAction) (pb.ActionState, error) { + l := w.logger.With("workflowID", wfID, "workerID", action.GetWorkerId(), "actionName", action.GetName(), "actionImage", action.GetImage()) + + cli := w.registryClient + err := w.regConn.pullImage(ctx, cli, action.GetImage()) + if err != nil { + return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER PULL") + } + id, err := w.createContainer(ctx, action.Command, wfID, action) if err != nil { - return err + return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER CREATE") } + l.With("containerID", id, "command", action.GetOnTimeout()).Info("container created") + + var timeCtx context.Context + var cancel context.CancelFunc + + if action.Timeout > 0 { + timeCtx, cancel = context.WithTimeout(ctx, time.Duration(action.Timeout)*time.Second) + } else { + timeCtx, cancel = context.WithTimeout(ctx, 1*time.Hour) + } + defer cancel() + + err = startContainer(timeCtx, l, cli, id) + if err != nil { + return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER RUN") + } + + failedActionStatus := make(chan pb.ActionState) + + // capturing logs of action container in a go-routine + go w.captureLogs(ctx, id) + + status, waitErr := waitContainer(timeCtx, cli, id) + defer func() { + if removalErr := removeContainer(ctx, l, cli, id); removalErr != nil { + l.With("containerID", id).Error(removalErr) + } + }() + + if waitErr != nil { + return status, errors.Wrap(waitErr, "DOCKER_WAIT") + } + + l.With("status", status.String()).Info("container removed") + if status != pb.ActionState_ACTION_SUCCESS { + if status == pb.ActionState_ACTION_TIMEOUT && action.OnTimeout != nil { + id, err = w.createContainer(ctx, action.OnTimeout, wfID, action) + if err != nil { + l.Error(errors.Wrap(err, errCreateContainer)) + } + l.With("containerID", id, "status", status.String(), "command", action.GetOnTimeout()).Info("container created") + failedActionStatus := make(chan pb.ActionState) + go w.captureLogs(ctx, id) + go waitFailedContainer(ctx, l, cli, id, failedActionStatus) + err = startContainer(ctx, l, cli, id) + if err != nil { + l.Error(errors.Wrap(err, errFailedToRunCmd)) + } + onTimeoutStatus := <-failedActionStatus + l.With("status", onTimeoutStatus).Info("action timeout") + } else { + if action.OnFailure != nil { + id, err = w.createContainer(ctx, action.OnFailure, wfID, action) + if err != nil { + l.Error(errors.Wrap(err, errFailedToRunCmd)) + } + l.With("containerID", id, "actionStatus", status.String(), "command", action.GetOnFailure()).Info("container created") + go w.captureLogs(ctx, id) + go waitFailedContainer(ctx, l, cli, id, failedActionStatus) + err = startContainer(ctx, l, cli, id) + if err != nil { + l.Error(errors.Wrap(err, errFailedToRunCmd)) + } + onFailureStatus := <-failedActionStatus + l.With("status", onFailureStatus).Info("action failed") + } + } + l.Info(infoWaitFinished) + if err != nil { + l.Error(errors.Wrap(err, errFailedToWait)) + } + } + l.With("status", status).Info("action container exited") + return status, nil +} + +// ProcessWorkflowActions gets all Workflow contexts and processes their actions +func (w *Worker) ProcessWorkflowActions(ctx context.Context, workerID string) error { + l := w.logger.With("workerID", workerID) + for { - l := logger.With("workerID", workerID) - res, err := client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID}) + res, err := w.client.GetWorkflowContexts(ctx, &pb.WorkflowContextRequest{WorkerId: workerID}) if err != nil { return errors.Wrap(err, errGetWfContext) } for wfContext, err := res.Recv(); err == nil && wfContext != nil; wfContext, err = res.Recv() { wfID := wfContext.GetWorkflowId() l = l.With("workflowID", wfID) - actions, err := client.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfID}) + actions, err := w.client.GetWorkflowActions(ctx, &pb.WorkflowActionsRequest{WorkflowId: wfID}) if err != nil { return errors.Wrap(err, errGetWfActions) } @@ -153,7 +281,7 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { WorkerId: action.GetWorkerId(), } - err := reportActionStatus(ctx, client, actionStatus) + err := w.reportActionStatus(ctx, actionStatus) if err != nil { exitWithGrpcError(err, l) } @@ -161,11 +289,11 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { } // get workflow data - getWorkflowData(ctx, client, workerID, wfID) + getWorkflowData(ctx, l, w.client, workerID, wfID) // start executing the action start := time.Now() - status, err := executeAction(ctx, actions.GetActionList()[actionIndex], wfID) + status, err := w.execute(ctx, wfID, action) elapsed := time.Since(start) actionStatus := &pb.WorkflowActionStatus{ @@ -184,9 +312,8 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { } l.With("actionStatus", actionStatus.ActionStatus.String()) l.Error(err) - rerr := reportActionStatus(ctx, client, actionStatus) - if rerr != nil { - exitWithGrpcError(rerr, l) + if reportErr := w.reportActionStatus(ctx, actionStatus); reportErr != nil { + exitWithGrpcError(reportErr, l) } delete(workflowcontexts, wfID) return err @@ -195,14 +322,14 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { actionStatus.ActionStatus = pb.ActionState_ACTION_SUCCESS actionStatus.Message = "finished execution successfully" - err = reportActionStatus(ctx, client, actionStatus) + err = w.reportActionStatus(ctx, actionStatus) if err != nil { exitWithGrpcError(err, l) } l.Info("sent action status") // send workflow data, if updated - updateWorkflowData(ctx, client, actionStatus) + w.updateWorkflowData(ctx, actionStatus) if len(actions.GetActionList()) == actionIndex+1 { l.Info("reached to end of workflow") @@ -219,8 +346,8 @@ func processWorkflowActions(client pb.WorkflowSvcClient) error { } } } - // sleep for 3 seconds before asking for new workflows - time.Sleep(retryInterval * time.Second) + // sleep before asking for new workflows + <-time.After(w.retryInterval * time.Second) } } @@ -236,19 +363,19 @@ func isLastAction(wfContext *pb.WorkflowContext, actions *pb.WorkflowActionList) return int(wfContext.GetCurrentActionIndex()) == len(actions.GetActionList())-1 } -func reportActionStatus(ctx context.Context, client pb.WorkflowSvcClient, actionStatus *pb.WorkflowActionStatus) error { - l := logger.With("workflowID", actionStatus.GetWorkflowId, +func (w *Worker) reportActionStatus(ctx context.Context, actionStatus *pb.WorkflowActionStatus) error { + l := w.logger.With("workflowID", actionStatus.GetWorkflowId, "workerID", actionStatus.GetWorkerId(), "actionName", actionStatus.GetActionName(), "taskName", actionStatus.GetTaskName(), ) var err error - for r := 1; r <= retries; r++ { - _, err = client.ReportActionStatus(ctx, actionStatus) + for r := 1; r <= w.retries; r++ { + _, err = w.client.ReportActionStatus(ctx, actionStatus) if err != nil { l.Error(errors.Wrap(err, errReportActionStatus)) - l.With("default", retryIntervalDefault).Info("RETRY_INTERVAL not set") - <-time.After(retryInterval * time.Second) + <-time.After(w.retryInterval * time.Second) + continue } return nil @@ -256,7 +383,7 @@ func reportActionStatus(ctx context.Context, client pb.WorkflowSvcClient, action return err } -func getWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, workerID, workflowID string) { +func getWorkflowData(ctx context.Context, logger log.Logger, client pb.WorkflowSvcClient, workerID, workflowID string) { l := logger.With("workflowID", workflowID, "workerID", workerID, ) @@ -279,12 +406,13 @@ func getWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, workerID, } } -func updateWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, actionStatus *pb.WorkflowActionStatus) { - l := logger.With("workflowID", actionStatus.GetWorkflowId, +func (w *Worker) updateWorkflowData(ctx context.Context, actionStatus *pb.WorkflowActionStatus) { + l := w.logger.With("workflowID", actionStatus.GetWorkflowId, "workerID", actionStatus.GetWorkerId(), "actionName", actionStatus.GetActionName(), "taskName", actionStatus.GetTaskName(), ) + wfDir := dataDir + string(os.PathSeparator) + actionStatus.GetWorkflowId() f := openDataFile(wfDir, l) defer f.Close() @@ -294,22 +422,22 @@ func updateWorkflowData(ctx context.Context, client pb.WorkflowSvcClient, action l.Error(err) } - if isValidDataFile(f, data, l) { + if isValidDataFile(f, w.maxSize, data, l) { h := sha.New() if _, ok := workflowDataSHA[actionStatus.GetWorkflowId()]; !ok { checksum := base64.StdEncoding.EncodeToString(h.Sum(data)) workflowDataSHA[actionStatus.GetWorkflowId()] = checksum - sendUpdate(ctx, client, actionStatus, data, checksum) + sendUpdate(ctx, w.logger, w.client, actionStatus, data, checksum) } else { newSHA := base64.StdEncoding.EncodeToString(h.Sum(data)) if !strings.EqualFold(workflowDataSHA[actionStatus.GetWorkflowId()], newSHA) { - sendUpdate(ctx, client, actionStatus, data, newSHA) + sendUpdate(ctx, w.logger, w.client, actionStatus, data, newSHA) } } } } -func sendUpdate(ctx context.Context, client pb.WorkflowSvcClient, st *pb.WorkflowActionStatus, data []byte, checksum string) { +func sendUpdate(ctx context.Context, logger log.Logger, client pb.WorkflowSvcClient, st *pb.WorkflowActionStatus, data []byte, checksum string) { l := logger.With("workflowID", st.GetWorkflowId, "workerID", st.GetWorkerId(), "actionName", st.GetActionName(), @@ -348,7 +476,7 @@ func openDataFile(wfDir string, l log.Logger) *os.File { return f } -func isValidDataFile(f *os.File, data []byte, l log.Logger) bool { +func isValidDataFile(f *os.File, maxSize int64, data []byte, l log.Logger) bool { var dataMap map[string]interface{} err := json.Unmarshal(data, &dataMap) if err != nil { @@ -358,17 +486,9 @@ func isValidDataFile(f *os.File, data []byte, l log.Logger) bool { stat, err := f.Stat() if err != nil { - logger.Error(err) + l.Error(err) return false } - val := os.Getenv(maxFileSize) - if val != "" { - maxSize, err := strconv.ParseInt(val, 10, 64) - if err == nil { - logger.Error(err) - } - return stat.Size() <= maxSize - } - return stat.Size() <= defaultMaxFileSize + return stat.Size() <= maxSize } diff --git a/cmd/tink-worker/main.go b/cmd/tink-worker/main.go index 3259e2537..8a8c0cea0 100644 --- a/cmd/tink-worker/main.go +++ b/cmd/tink-worker/main.go @@ -2,103 +2,31 @@ package main import ( "os" - "strconv" - "time" "github.com/packethost/pkg/log" - "github.com/pkg/errors" - "github.com/tinkerbell/tink/client" - pb "github.com/tinkerbell/tink/protos/workflow" - "google.golang.org/grpc" + "github.com/tinkerbell/tink/cmd/tink-worker/cmd" ) const ( - retryIntervalDefault = 3 - retryCountDefault = 3 - - serviceKey = "github.com/tinkerbell/tink" - invalidRetryInterval = "invalid RETRY_INTERVAL, using default (seconds)" - invalidMaxRetry = "invalid MAX_RETRY, using default" - - errWorker = "worker finished with error" + serviceKey = "github.com/tinkerbell/tink" ) var ( - rClient pb.WorkflowSvcClient - retryInterval time.Duration - retries int - logger log.Logger - // version is set at build time version = "devel" ) func main() { - log, err := log.Init(serviceKey) + logger, err := log.Init(serviceKey) if err != nil { panic(err) } - logger = log - defer logger.Close() - log.With("version", version).Info("starting") - setupRetry() - if setupErr := client.Setup(); setupErr != nil { - log.Error(setupErr) - os.Exit(1) - } - conn, err := tryClientConnection() - if err != nil { - log.Error(err) - os.Exit(1) - } - rClient = pb.NewWorkflowSvcClient(conn) - err = processWorkflowActions(rClient) - if err != nil { - log.Error(errors.Wrap(err, errWorker)) - } -} -func tryClientConnection() (*grpc.ClientConn, error) { - var err error - for r := 1; r <= retries; r++ { - c, e := client.GetConnection() - if e != nil { - err = e - logger.With("error", err, "duration", retryInterval).Info("failed to connect, sleeping before retrying") - <-time.After(retryInterval * time.Second) - continue - } - return c, nil - } - return nil, err -} + defer logger.Close() -func setupRetry() { - interval := os.Getenv("RETRY_INTERVAL") - if interval == "" { - logger.With("default", retryIntervalDefault).Info("RETRY_INTERVAL not set") - retryInterval = retryIntervalDefault - } else { - interval, err := time.ParseDuration(interval) - if err != nil { - logger.With("default", retryIntervalDefault).Info(invalidRetryInterval) - retryInterval = retryIntervalDefault - } else { - retryInterval = interval - } - } + rootCmd := cmd.NewRootCommand(version, logger) - maxRetry := os.Getenv("MAX_RETRY") - if maxRetry == "" { - logger.With("default", retryCountDefault).Info("MAX_RETRY not set") - retries = retryCountDefault - } else { - max, err := strconv.Atoi(maxRetry) - if err != nil { - logger.With("default", retryCountDefault).Info(invalidMaxRetry) - retries = retryCountDefault - } else { - retries = max - } + if err := rootCmd.Execute(); err != nil { + os.Exit(1) } } diff --git a/go.mod b/go.mod index 745fb0f67..32059aa18 100644 --- a/go.mod +++ b/go.mod @@ -25,12 +25,16 @@ require ( github.com/packethost/pkg v0.0.0-20200903155310-0433e0605550 github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v1.3.0 + github.com/prometheus/common v0.7.0 + github.com/rollbar/rollbar-go v1.0.2 // indirect github.com/rubenv/sql-migrate v0.0.0-20200616145509-8d140a17f351 github.com/sirupsen/logrus v1.4.2 github.com/spf13/cobra v1.0.0 + github.com/spf13/pflag v1.0.3 github.com/spf13/viper v1.4.0 github.com/stretchr/testify v1.4.0 go.mongodb.org/mongo-driver v1.1.2 // indirect + golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect golang.org/x/sys v0.0.0-20200331124033-c3d80250170d // indirect google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 diff --git a/http-server/http_handlers.go b/http-server/http_handlers.go index ffcb7c16b..89264a52d 100644 --- a/http-server/http_handlers.go +++ b/http-server/http_handlers.go @@ -23,6 +23,8 @@ import ( "google.golang.org/grpc/status" ) +// RegisterHardwareServiceHandlerFromEndpoint serves Hardware requests at the +// given endpoint over GRPC func RegisterHardwareServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error { conn, err := grpc.Dial(endpoint, opts...) if err != nil { @@ -214,6 +216,8 @@ func RegisterHardwareServiceHandlerFromEndpoint(ctx context.Context, mux *runtim return nil } +// RegisterTemplateHandlerFromEndpoint serves Template requests at the given +// endpoint over GRPC func RegisterTemplateHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error { conn, err := grpc.Dial(endpoint, opts...) if err != nil { @@ -347,6 +351,8 @@ func RegisterTemplateHandlerFromEndpoint(ctx context.Context, mux *runtime.Serve return nil } +// RegisterWorkflowSvcHandlerFromEndpoint serves Workflow requests at the given +// endpoint over GRPC func RegisterWorkflowSvcHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error { conn, err := grpc.Dial(endpoint, opts...) if err != nil { From 70ea6b49d1385304afc407de214625c5319f9475 Mon Sep 17 00:00:00 2001 From: Marques Johansson Date: Tue, 22 Sep 2020 23:39:30 -0400 Subject: [PATCH 3/4] worker: use local (root) command arguments instead of global (Persistent) arguments Signed-off-by: Marques Johansson --- cmd/tink-worker/cmd/root.go | 76 +++++++++++++++--------------- cmd/tink-worker/internal/action.go | 1 - cmd/tink-worker/internal/worker.go | 3 +- http-server/http_handlers.go | 3 +- 4 files changed, 41 insertions(+), 42 deletions(-) diff --git a/cmd/tink-worker/cmd/root.go b/cmd/tink-worker/cmd/root.go index 39ec6508f..f92a78fd1 100644 --- a/cmd/tink-worker/cmd/root.go +++ b/cmd/tink-worker/cmd/root.go @@ -3,7 +3,6 @@ package cmd import ( "context" "fmt" - "os" "strings" "time" @@ -19,42 +18,36 @@ import ( ) const ( - retryIntervalDefault = 3 - retryCountDefault = 3 - defaultMaxFileSize int64 = 10485760 //10MB ~= 10485760Bytes + defaultRetryInterval = 3 + defaultRetryCount = 3 + defaultMaxFileSize int64 = 10 * 1024 * 1024 //10MB defaultTimeoutMinutes = 60 ) // NewRootCommand creates a new Tink Worker Cobra root command func NewRootCommand(version string, logger log.Logger) *cobra.Command { - must := func(err error) { - if err != nil { - logger.Fatal(err) - } - } - rootCmd := &cobra.Command{ Use: "tink-worker", Short: "Tink Worker", Version: version, - PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - viper, err := createViper() + PreRunE: func(cmd *cobra.Command, args []string) error { + viper, err := createViper(logger) if err != nil { return err } return applyViper(viper, cmd) }, RunE: func(cmd *cobra.Command, args []string) error { - retryInterval, _ := cmd.PersistentFlags().GetDuration("retry-interval") - retries, _ := cmd.PersistentFlags().GetInt("retries") + retryInterval, _ := cmd.Flags().GetDuration("retry-interval") + retries, _ := cmd.Flags().GetInt("retries") // TODO(displague) is log-level no longer useful? - // logLevel, _ := cmd.PersistentFlags().GetString("log-level") - workerID, _ := cmd.PersistentFlags().GetString("id") - maxFileSize, _ := cmd.PersistentFlags().GetInt64("max-file-size") - timeOut, _ := cmd.PersistentFlags().GetDuration("timeout") - user, _ := cmd.PersistentFlags().GetString("registry-username") - pwd, _ := cmd.PersistentFlags().GetString("registry-password") - registry, _ := cmd.PersistentFlags().GetString("docker-registry") + // logLevel, _ := cmd.Flags().GetString("log-level") + workerID, _ := cmd.Flags().GetString("id") + maxFileSize, _ := cmd.Flags().GetInt64("max-file-size") + timeOut, _ := cmd.Flags().GetDuration("timeout") + user, _ := cmd.Flags().GetString("registry-username") + pwd, _ := cmd.Flags().GetString("registry-password") + registry, _ := cmd.Flags().GetString("docker-registry") logger.With("version", version).Info("starting") if setupErr := client.Setup(); setupErr != nil { @@ -85,27 +78,33 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command { }, } - rootCmd.PersistentFlags().Duration("retry-interval", retryIntervalDefault, "Retry interval in seconds") + rootCmd.Flags().Duration("retry-interval", defaultRetryInterval, "Retry interval in seconds") - rootCmd.PersistentFlags().Duration("timeout", time.Duration(defaultTimeoutMinutes*time.Minute), "Max duration to wait for worker to complete") + rootCmd.Flags().Duration("timeout", time.Duration(defaultTimeoutMinutes*time.Minute), "Max duration to wait for worker to complete") - rootCmd.PersistentFlags().Int("max-retry", retryCountDefault, "Maximum number of retries to attempt") + rootCmd.Flags().Int("max-retry", defaultRetryCount, "Maximum number of retries to attempt") - rootCmd.PersistentFlags().Int64("max-file-size", defaultMaxFileSize, "Maximum file size in bytes") + rootCmd.Flags().Int64("max-file-size", defaultMaxFileSize, "Maximum file size in bytes") - // rootCmd.PersistentFlags().String("log-level", "info", "Sets the worker log level (panic, fatal, error, warn, info, debug, trace)") + // rootCmd.Flags().String("log-level", "info", "Sets the worker log level (panic, fatal, error, warn, info, debug, trace)") + + must := func(err error) { + if err != nil { + logger.Fatal(err) + } + } - rootCmd.PersistentFlags().StringP("id", "i", "", "Sets the worker id") - must(rootCmd.MarkPersistentFlagRequired("id")) + rootCmd.Flags().StringP("id", "i", "", "Sets the worker id") + must(rootCmd.MarkFlagRequired("id")) - rootCmd.PersistentFlags().StringP("docker-registry", "r", "", "Sets the Docker registry") - must(rootCmd.MarkPersistentFlagRequired("docker-registry")) + rootCmd.Flags().StringP("docker-registry", "r", "", "Sets the Docker registry") + must(rootCmd.MarkFlagRequired("docker-registry")) - rootCmd.PersistentFlags().StringP("registry-username", "u", "", "Sets the registry username") - must(rootCmd.MarkPersistentFlagRequired("registry-username")) + rootCmd.Flags().StringP("registry-username", "u", "", "Sets the registry username") + must(rootCmd.MarkFlagRequired("registry-username")) - rootCmd.PersistentFlags().StringP("registry-password", "p", "", "Sets the registry-password") - must(rootCmd.MarkPersistentFlagRequired("registry-password")) + rootCmd.Flags().StringP("registry-password", "p", "", "Sets the registry-password") + must(rootCmd.MarkFlagRequired("registry-password")) return rootCmd } @@ -113,22 +112,23 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command { // createViper creates a Viper object configured to read in configuration files // (from various paths with content type specific filename extensions) and load // environment variables that start with TINK_WORKER. -func createViper() (*viper.Viper, error) { +func createViper(logger log.Logger) (*viper.Viper, error) { v := viper.New() v.AutomaticEnv() v.SetConfigName("tink-worker") v.AddConfigPath("/etc/tinkerbell") v.AddConfigPath(".") - v.SetEnvPrefix("TINK_WORKER") v.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) // If a config file is found, read it in. if err := v.ReadInConfig(); err != nil { if _, ok := err.(viper.ConfigFileNotFoundError); !ok { + logger.With("configFile", v.ConfigFileUsed()).Error(err, "could not load config file") return nil, err } + logger.Info("no config file found") } else { - fmt.Fprintln(os.Stderr, "Using config file:", v.ConfigFileUsed()) + logger.With("configFile", v.ConfigFileUsed()).Info("loaded config file") } return v, nil @@ -137,7 +137,7 @@ func createViper() (*viper.Viper, error) { func applyViper(v *viper.Viper, cmd *cobra.Command) error { errors := []error{} - cmd.PersistentFlags().VisitAll(func(f *pflag.Flag) { + cmd.Flags().VisitAll(func(f *pflag.Flag) { if !f.Changed && v.IsSet(f.Name) { val := v.Get(f.Name) if err := cmd.Flags().Set(f.Name, fmt.Sprintf("%v", val)); err != nil { diff --git a/cmd/tink-worker/internal/action.go b/cmd/tink-worker/internal/action.go index b9815b645..22b56608d 100644 --- a/cmd/tink-worker/internal/action.go +++ b/cmd/tink-worker/internal/action.go @@ -14,7 +14,6 @@ import ( ) const ( - errContextClosed = "failed to wait for container, context closed" errCreateContainer = "failed to create container" errFailedToWait = "failed to wait for completion of action" errFailedToRunCmd = "failed to run on-timeout command" diff --git a/cmd/tink-worker/internal/worker.go b/cmd/tink-worker/internal/worker.go index aa2918b7f..310318cff 100644 --- a/cmd/tink-worker/internal/worker.go +++ b/cmd/tink-worker/internal/worker.go @@ -98,8 +98,7 @@ func (w *Worker) execute(ctx context.Context, wfID string, action *pb.WorkflowAc l := w.logger.With("workflowID", wfID, "workerID", action.GetWorkerId(), "actionName", action.GetName(), "actionImage", action.GetImage()) cli := w.registryClient - err := w.regConn.pullImage(ctx, cli, action.GetImage()) - if err != nil { + if err := w.regConn.pullImage(ctx, cli, action.GetImage()); err != nil { return pb.ActionState_ACTION_IN_PROGRESS, errors.Wrap(err, "DOCKER PULL") } id, err := w.createContainer(ctx, action.Command, wfID, action) diff --git a/http-server/http_handlers.go b/http-server/http_handlers.go index 89264a52d..cfba032af 100644 --- a/http-server/http_handlers.go +++ b/http-server/http_handlers.go @@ -8,7 +8,8 @@ import ( "net/http" tt "text/template" - // nolint:staticcheck SA1019 We will do it later + // nolint:staticcheck + // SA1019 We will do it later "github.com/golang/protobuf/jsonpb" "github.com/tinkerbell/tink/protos/template" From e78b8527d88b11ba56faad2008ab9dd410f38e00 Mon Sep 17 00:00:00 2001 From: Marques Johansson Date: Mon, 28 Sep 2020 09:46:05 -0400 Subject: [PATCH 4/4] add environment variable names to tink-worker help Signed-off-by: Marques Johansson --- cmd/tink-worker/cmd/root.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/cmd/tink-worker/cmd/root.go b/cmd/tink-worker/cmd/root.go index f92a78fd1..02ad1ed9f 100644 --- a/cmd/tink-worker/cmd/root.go +++ b/cmd/tink-worker/cmd/root.go @@ -78,13 +78,13 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command { }, } - rootCmd.Flags().Duration("retry-interval", defaultRetryInterval, "Retry interval in seconds") + rootCmd.Flags().Duration("retry-interval", defaultRetryInterval, "Retry interval in seconds (RETRY_INTERVAL)") - rootCmd.Flags().Duration("timeout", time.Duration(defaultTimeoutMinutes*time.Minute), "Max duration to wait for worker to complete") + rootCmd.Flags().Duration("timeout", time.Duration(defaultTimeoutMinutes*time.Minute), "Max duration to wait for worker to complete (TIMEOUT)") - rootCmd.Flags().Int("max-retry", defaultRetryCount, "Maximum number of retries to attempt") + rootCmd.Flags().Int("max-retry", defaultRetryCount, "Maximum number of retries to attempt (MAX_RETRY)") - rootCmd.Flags().Int64("max-file-size", defaultMaxFileSize, "Maximum file size in bytes") + rootCmd.Flags().Int64("max-file-size", defaultMaxFileSize, "Maximum file size in bytes (MAX_FILE_SIZE)") // rootCmd.Flags().String("log-level", "info", "Sets the worker log level (panic, fatal, error, warn, info, debug, trace)") @@ -94,24 +94,24 @@ func NewRootCommand(version string, logger log.Logger) *cobra.Command { } } - rootCmd.Flags().StringP("id", "i", "", "Sets the worker id") + rootCmd.Flags().StringP("id", "i", "", "Sets the worker id (ID)") must(rootCmd.MarkFlagRequired("id")) - rootCmd.Flags().StringP("docker-registry", "r", "", "Sets the Docker registry") + rootCmd.Flags().StringP("docker-registry", "r", "", "Sets the Docker registry (DOCKER_REGISTRY)") must(rootCmd.MarkFlagRequired("docker-registry")) - rootCmd.Flags().StringP("registry-username", "u", "", "Sets the registry username") + rootCmd.Flags().StringP("registry-username", "u", "", "Sets the registry username (REGISTRY_USERNAME)") must(rootCmd.MarkFlagRequired("registry-username")) - rootCmd.Flags().StringP("registry-password", "p", "", "Sets the registry-password") + rootCmd.Flags().StringP("registry-password", "p", "", "Sets the registry-password (REGISTRY_PASSWORD)") must(rootCmd.MarkFlagRequired("registry-password")) return rootCmd } // createViper creates a Viper object configured to read in configuration files -// (from various paths with content type specific filename extensions) and load -// environment variables that start with TINK_WORKER. +// (from various paths with content type specific filename extensions) and loads +// environment variables. func createViper(logger log.Logger) (*viper.Viper, error) { v := viper.New() v.AutomaticEnv()