Skip to content

Commit

Permalink
[BEAM-14487] Make drain & update terminal states. (apache#17710)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored May 19, 2022
1 parent 4689462 commit de497f7
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 16 deletions.
49 changes: 33 additions & 16 deletions sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,29 +240,46 @@ func WaitForCompletion(ctx context.Context, client *df.Service, project, region,
return errors.Wrap(err, "failed to get job")
}

switch j.CurrentState {
case "JOB_STATE_DONE":
log.Info(ctx, "Job succeeded!")
return nil

case "JOB_STATE_CANCELLED":
log.Info(ctx, "Job cancelled")
terminal, msg, err := currentStateMessage(j.CurrentState, jobID)
if err != nil {
return err
}
log.Infof(ctx, msg)
if terminal {
return nil

case "JOB_STATE_FAILED":
return errors.Errorf("job %s failed", jobID)

case "JOB_STATE_RUNNING":
log.Info(ctx, "Job still running ...")

default:
log.Infof(ctx, "Job state: %v ...", j.CurrentState)
}

time.Sleep(30 * time.Second)
}
}

// currentStateMessage indicates if the state is terminal, and provides a message to log, or an error.
// Errors are always terminal.
func currentStateMessage(currentState, jobID string) (bool, string, error) {
switch currentState {
// Add all Terminal Success stats here.
case "JOB_STATE_DONE", "JOB_STATE_CANCELLED", "JOB_STATE_DRAINED", "JOB_STATE_UPDATED":
var state string
switch currentState {
case "JOB_STATE_DONE":
state = "succeeded!"
case "JOB_STATE_CANCELLED":
state = "cancelled"
case "JOB_STATE_DRAINED":
state = "drained"
case "JOB_STATE_UPDATED":
state = "updated"
}
return true, fmt.Sprintf("Job %v %v", jobID, state), nil
case "JOB_STATE_FAILED":
return true, "", errors.Errorf("Job %s failed", jobID)
case "JOB_STATE_RUNNING":
return false, "Job still running ...", nil
default:
return false, fmt.Sprintf("Job state: %v ...", currentState), nil
}
}

// NewClient creates a new dataflow client with default application credentials
// and CloudPlatformScope. The Dataflow endpoint is optionally overridden.
func NewClient(ctx context.Context, endpoint string) (*df.Service, error) {
Expand Down
37 changes: 37 additions & 0 deletions sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package dataflowlib

import (
"context"
"fmt"
"reflect"
"testing"
)
Expand Down Expand Up @@ -157,3 +158,39 @@ func TestValidateWorkerSettings(t *testing.T) {
})
}
}

func TestCurrentStateMessage(t *testing.T) {
tests := []struct {
state string
term bool
want string
wantErr error
}{
{state: "JOB_STATE_DONE", want: "Job JorbID-09876 succeeded!", term: true},
{state: "JOB_STATE_DRAINED", want: "Job JorbID-09876 drained", term: true},
{state: "JOB_STATE_UPDATED", want: "Job JorbID-09876 updated", term: true},
{state: "JOB_STATE_CANCELLED", want: "Job JorbID-09876 cancelled", term: true},
{state: "JOB_STATE_RUNNING", want: "Job still running ...", term: false},
{state: "JOB_STATE_FAILED", wantErr: fmt.Errorf("Job JorbID-09876 failed"), term: true},
{state: "Ossiphrage", want: "Job state: Ossiphrage ...", term: false},
}
for _, test := range tests {
t.Run(test.state, func(t *testing.T) {
const jobID = "JorbID-09876"
term, got, err := currentStateMessage(test.state, jobID)
if term != test.term {
termGot, termWant := "false (continues)", "true (terminal)"
if !test.term {
termGot, termWant = termWant, termGot
}
t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, termGot, termWant)
}
if err != nil && err.Error() != test.wantErr.Error() {
t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, err, test.wantErr)
}
if got != test.want {
t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, got, test.want)
}
})
}
}

0 comments on commit de497f7

Please sign in to comment.