diff --git a/commands/activations.go b/commands/activations.go index 2362ad6dd..875215912 100644 --- a/commands/activations.go +++ b/commands/activations.go @@ -17,13 +17,17 @@ import ( "encoding/json" "fmt" "io" + "os" + "os/signal" "regexp" + "syscall" "time" "github.com/apache/openwhisk-client-go/whisk" "github.com/digitalocean/doctl" "github.com/digitalocean/doctl/commands/charm/text" "github.com/digitalocean/doctl/commands/displayers" + "github.com/digitalocean/doctl/do" "github.com/spf13/cobra" ) @@ -63,8 +67,9 @@ logs.`, `Use `+"`"+`doctl serverless activations list`+"`"+` to list the activation records that are present in the cloud for previously invoked functions.`, Writer, + aliasOpt("ls"), displayerType(&displayers.Activation{}), - aliasOpt("ls")) + ) AddIntFlag(list, "limit", "l", 30, "only return LIMIT number of activations (default 30, max 200)") AddIntFlag(list, "skip", "s", 0, "exclude the first SKIP number of activations from the result") AddIntFlag(list, "since", "", 0, "return activations with timestamps later than SINCE; measured in milliseconds since Th, 01, Jan 1970") @@ -84,6 +89,10 @@ for new arrivals.`, AddBoolFlag(logs, "strip", "r", false, "strip timestamp information and output first line only") AddBoolFlag(logs, "follow", "", false, "Fetch logs continuously") + // This is the default behavior, so we want to prevent users from explicitly using this flag. We don't want to remove it + // to maintain backwards compatibility + logs.Flags().MarkHidden("last") + result := CmdBuilder(cmd, RunActivationsResult, "result []", "Retrieves the Results for an Activation", `Use `+"`"+`doctl serverless activations result`+"`"+` to retrieve just the results portion of one or more activation records.`, @@ -176,7 +185,7 @@ func makeBanner(writer io.Writer, activation whisk.Activation) { end := time.UnixMilli(activation.End).Format("01/02 03:04:05") init := text.NewStyled("=== ").Muted() body := fmt.Sprintf("%s %s %s %s:%s", activation.ActivationID, displayers.GetActivationStatus(activation.StatusCode), - end, activation.Name, activation.Version) + end, displayers.GetActivationFunctionName(activation), activation.Version) msg := text.NewStyled(body).Highlight() fmt.Fprintln(writer, init.String()+msg.String()) } @@ -193,7 +202,7 @@ func printLogs(writer io.Writer, strip bool, activation whisk.Activation) { // dtsRegex is a regular expression that matches the prefix of some activation log entries. // It is used by stripLog to remove that prefix -var dtsRegex = regexp.MustCompile(`\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:.*: `) +var dtsRegex = regexp.MustCompile(`(?U)\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:.*: `) // stripLog strips the prefix from log entries func stripLog(entry string) string { @@ -288,31 +297,146 @@ func RunActivationsList(c *CmdConfig) error { // RunActivationsLogs supports the 'activations logs' command func RunActivationsLogs(c *CmdConfig) error { argCount := len(c.Args) + if argCount > 1 { return doctl.NewTooManyArgsErr(c.NS) } - replaceFunctionWithAction(c) - augmentPackageWithDeployed(c) - if isWatching(c) { - return RunServerlessExecStreaming(activationLogs, c, []string{flagLast, flagStrip, flagWatch, flagDeployed}, []string{flagAction, flagPackage, flagLimit}) + + var activationId string + if argCount == 1 { + activationId = c.Args[0] + } + + sls := c.Serverless() + + limitFlag, _ := c.Doit.GetInt(c.NS, flagLimit) + stripFlag, _ := c.Doit.GetBool(c.NS, flagStrip) + followFlag, _ := c.Doit.GetBool(c.NS, flagFollow) + functionFlag, _ := c.Doit.GetString(c.NS, flagFunction) + packageFlag, _ := c.Doit.GetString(c.NS, flagPackage) + + limit := limitFlag + if limitFlag > 200 { + limit = 200 + } + + if activationId != "" { + actv, err := sls.GetActivationLogs(activationId) + if err != nil { + return err + } + printLogs(c.Out, stripFlag, actv) + return nil + + } else if followFlag { + sigChannel := make(chan os.Signal, 1) + signal.Notify(sigChannel, os.Interrupt, syscall.SIGTERM) + errChannel := make(chan error, 1) + + go pollActivations(errChannel, sls, c.Out, functionFlag, packageFlag) + + select { + case <-sigChannel: + fmt.Fprintf(c.Out, "\r") + return nil + case e := <-errChannel: + return e + } } - output, err := RunServerlessExec(activationLogs, c, []string{flagLast, flagStrip, flagWatch, flagDeployed}, []string{flagAction, flagPackage, flagLimit}) + + listOptions := whisk.ActivationListOptions{Limit: limit, Name: functionFlag, Docs: true} + actvs, err := sls.ListActivations(listOptions) + if err != nil { return err } - return c.PrintServerlessTextOutput(output) + + if packageFlag != "" { + actvs = filterPackages(actvs, packageFlag) + } + + for _, a := range reverseActivations(actvs) { + makeBanner(c.Out, a) + printLogs(c.Out, stripFlag, a) + fmt.Fprintln(c.Out) + } + return nil +} + +// Polls the ActivationList API at an interval and prints the results. +func pollActivations(ec chan error, sls do.ServerlessService, writer io.Writer, functionFlag string, packageFlag string) { + ticker := time.NewTicker(time.Second * 5) + tc := ticker.C + var lastActivationTimestamp int64 = 0 + requestLimit := 1 + + // There seems to be a race condition where functions invocation that start before lastActivationTimestamp + // but is not completed by the time we make the list activation request will display twice. So prevent this issue + // we keep track of the activation ids displayed, so we don't display the logs twice. + printedActivations := map[string]int64{} + + for { + select { + case <-tc: + options := whisk.ActivationListOptions{Limit: requestLimit, Since: lastActivationTimestamp, Docs: true, Name: functionFlag} + actv, err := sls.ListActivations(options) + + if err != nil { + ec <- err + ticker.Stop() + break + } + + if packageFlag != "" { + actv = filterPackages(actv, packageFlag) + } + + if len(actv) > 0 { + for _, activation := range reverseActivations(actv) { + _, knownActivation := printedActivations[activation.ActivationID] + + if knownActivation { + continue + } + + printedActivations[activation.ActivationID] = activation.Start + + makeBanner(writer, activation) + printLogs(writer, false, activation) + fmt.Fprintln(writer) + } + + lastItem := actv[len(actv)-1] + lastActivationTimestamp = lastItem.Start + 100 + requestLimit = 0 + } + } + } } -// isWatching (1) modifies the config replacing the "follow" flag (significant to doctl) with the -// "watch" flag (significant to nim) (2) Returns whether the command should be run in streaming mode -// (will be true iff follow/watch is true). -func isWatching(c *CmdConfig) bool { - yes, err := c.Doit.GetBool(c.NS, flagFollow) - if yes && err == nil { - c.Doit.Set(c.NS, flagWatch, true) - return true +// Filters the activations to only return activations belonging to the package. +func filterPackages(activations []whisk.Activation, packageName string) []whisk.Activation { + filteredActv := []whisk.Activation{} + + for _, activation := range activations { + inPackage := displayers.GetActivationPackageName(activation) == packageName + if inPackage { + filteredActv = append(filteredActv, activation) + } } - return false + return filteredActv +} + +func reverseActivations(actv []whisk.Activation) []whisk.Activation { + a := make([]whisk.Activation, len(actv)) + copy(a, actv) + + for i := len(a)/2 - 1; i >= 0; i-- { + opp := len(a) - 1 - i + a[i], a[opp] = a[opp], a[i] + } + + return a } // RunActivationsResult supports the 'activations result' command @@ -369,23 +493,3 @@ func RunActivationsResult(c *CmdConfig) error { } return nil } - -// replaceFunctionWithAction detects that --function was specified and renames it to --action (which is what nim -// will expect to see). -func replaceFunctionWithAction(c *CmdConfig) { - value, err := c.Doit.GetString(c.NS, flagFunction) - if err == nil && value != "" { - c.Doit.Set(c.NS, flagFunction, "") - c.Doit.Set(c.NS, flagAction, value) - } -} - -// augmentPackageWithDeployed detects that --package was specified and adds the --deployed flag if so. -// The code in 'nim' (inherited from Adobe I/O) will otherwise look for a deployment manifest which we -// don't want to support here. -func augmentPackageWithDeployed(c *CmdConfig) { - value, err := c.Doit.GetString(c.NS, flagPackage) - if err == nil && value != "" { - c.Doit.Set(c.NS, flagDeployed, true) - } -} diff --git a/commands/activations_test.go b/commands/activations_test.go index bf570a9f8..ed30042ca 100644 --- a/commands/activations_test.go +++ b/commands/activations_test.go @@ -15,7 +15,7 @@ package commands import ( "bytes" - "os/exec" + "errors" "sort" "strconv" "strings" @@ -23,7 +23,6 @@ import ( "time" "github.com/apache/openwhisk-client-go/whisk" - "github.com/digitalocean/doctl/do" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -436,67 +435,57 @@ func TestActivationsList(t *testing.T) { func TestActivationsLogs(t *testing.T) { tests := []struct { - name string - doctlArgs string - doctlFlags map[string]string - expectedNimArgs []string - expectStream bool + name string + doctlArgs string + doctlFlags map[string]string }{ { - name: "no flags or args", - expectedNimArgs: []string{}, - }, - { - name: "no flags with ID", - doctlArgs: "activationid", - expectedNimArgs: []string{"activationid"}, - }, - { - name: "last flag", - doctlFlags: map[string]string{"last": ""}, - expectedNimArgs: []string{"--last"}, - }, - { - name: "limit flag", - doctlFlags: map[string]string{"limit": "10"}, - expectedNimArgs: []string{"--limit", "10"}, + name: "no flags or args", }, { - name: "function flag", - doctlFlags: map[string]string{"function": "sample"}, - expectedNimArgs: []string{"--action", "sample"}, - }, - { - name: "package flag", - doctlFlags: map[string]string{"package": "sample"}, - expectedNimArgs: []string{"--deployed", "--package", "sample"}, + name: "no flags with ID", + doctlArgs: "123-abc", }, { - name: "follow flag", - doctlFlags: map[string]string{"follow": ""}, - expectedNimArgs: []string{"--watch"}, - expectStream: true, + name: "multiple limit flags", + doctlFlags: map[string]string{"limit": "10", "function": "hello1"}, }, { - name: "strip flag", - doctlFlags: map[string]string{"strip": ""}, - expectedNimArgs: []string{"--strip"}, + name: "follow flag", + doctlFlags: map[string]string{"follow": ""}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { withTestClient(t, func(config *CmdConfig, tm *tcMocks) { - fakeCmd := &exec.Cmd{ - Stdout: config.Out, - } - if tt.doctlArgs != "" { config.Args = append(config.Args, tt.doctlArgs) } + follow := false + activationId := "" + if len(config.Args) == 1 { + activationId = config.Args[0] + } + + var limit interface{} + var funcName interface{} + if tt.doctlFlags != nil { for k, v := range tt.doctlFlags { + if k == "limit" { + limit, _ = strconv.ParseInt(v, 0, 64) + } + + if k == "follow" { + follow = true + } + + if k == "function" { + funcName = v + } + if v == "" { config.Doit.Set(config.NS, k, true) } else { @@ -505,18 +494,29 @@ func TestActivationsLogs(t *testing.T) { } } - tm.serverless.EXPECT().CheckServerlessStatus().MinTimes(1).Return(nil) - if tt.expectStream { - expectedArgs := append([]string{"activation/logs"}, tt.expectedNimArgs...) - tm.serverless.EXPECT().Cmd("nocapture", expectedArgs).Return(fakeCmd, nil) - tm.serverless.EXPECT().Stream(fakeCmd).Return(nil) + if activationId != "" { + tm.serverless.EXPECT().GetActivationLogs(activationId).Return(theActivations[0], nil) + err := RunActivationsLogs(config) + require.NoError(t, err) + } else if follow { + expectedListOptions := whisk.ActivationListOptions{Limit: 1, Docs: true} + tm.serverless.EXPECT().ListActivations(expectedListOptions).Return(nil, errors.New("Something went wrong")) + err := RunActivationsLogs(config) + require.Error(t, err) + } else { - tm.serverless.EXPECT().Cmd("activation/logs", tt.expectedNimArgs).Return(fakeCmd, nil) - tm.serverless.EXPECT().Exec(fakeCmd).Return(do.ServerlessOutput{}, nil) - } + expectedListOptions := whisk.ActivationListOptions{Docs: true} + if limit != nil { + expectedListOptions.Limit = int(limit.(int64)) + } - err := RunActivationsLogs(config) - require.NoError(t, err) + if funcName != nil { + expectedListOptions.Name = funcName.(string) + } + tm.serverless.EXPECT().ListActivations(expectedListOptions).Return(theActivations, nil) + err := RunActivationsLogs(config) + require.NoError(t, err) + } }) }) } diff --git a/commands/displayers/activations.go b/commands/displayers/activations.go index f7d30fbf8..e86e4a1b4 100644 --- a/commands/displayers/activations.go +++ b/commands/displayers/activations.go @@ -64,7 +64,7 @@ func (a *Activation) KV() []map[string]interface{} { "Start": getActivationStartType(actv), "Wait": getActivationAnnotationValue(actv, "waitTime"), "Duration": fmt.Sprintf("%dms", actv.Duration), - "Function": getActivationFunctionName(actv), + "Function": GetActivationFunctionName(actv), } out = append(out, o) } @@ -79,7 +79,7 @@ func getActivationStartType(a whisk.Activation) string { } // Gets the full function name for the activation. -func getActivationFunctionName(a whisk.Activation) string { +func GetActivationFunctionName(a whisk.Activation) string { name := a.Name path := getActivationAnnotationValue(a, "path") @@ -96,6 +96,27 @@ func getActivationFunctionName(a whisk.Activation) string { return name } +func GetActivationPackageName(a whisk.Activation) string { + name := a.Name + + if a.Annotations == nil { + return "" + } + + path := a.Annotations.GetValue("path") + + if path == nil { + return name + } + + parts := strings.Split(path.(string), "/") + + if len(parts) == 3 { + return parts[1] + } + + return "" +} func getActivationAnnotationValue(a whisk.Activation, key string) interface{} { if a.Annotations == nil { return nil diff --git a/do/serverless.go b/do/serverless.go index fde815f7f..73f57af0a 100644 --- a/do/serverless.go +++ b/do/serverless.go @@ -820,7 +820,11 @@ func (s *serverlessService) GetActivationCount(options whisk.ActivationCountOpti if err != nil { return empty, err } + resp, _, err := s.owClient.Activations.Count(&options) + if err != nil { + return empty, err + } return *resp, err } @@ -831,7 +835,11 @@ func (s *serverlessService) GetActivation(id string) (whisk.Activation, error) { if err != nil { return empty, err } + resp, _, err := s.owClient.Activations.Get(id) + if err != nil { + return empty, err + } return *resp, err } @@ -842,7 +850,12 @@ func (s *serverlessService) GetActivationLogs(id string) (whisk.Activation, erro if err != nil { return empty, err } + resp, _, err := s.owClient.Activations.Logs(id) + if err != nil { + return empty, err + } + return *resp, err } @@ -853,7 +866,11 @@ func (s *serverlessService) GetActivationResult(id string) (whisk.Response, erro if err != nil { return empty, err } + resp, _, err := s.owClient.Activations.Result(id) + if err != nil { + return empty, err + } return *resp, err }