From 625a4e2e8fd0fe0b14269700e63ed75abdbee50e Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Wed, 18 May 2022 22:25:32 -0700 Subject: [PATCH 1/3] [BEAM-14487] Make drain & update terminal states. --- .../beam/runners/dataflow/dataflowlib/job.go | 49 +++++++++++++------ .../runners/dataflow/dataflowlib/job_test.go | 37 ++++++++++++++ 2 files changed, 70 insertions(+), 16 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go index 6aa0712daece..55f8b69a0237 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go @@ -240,29 +240,46 @@ func WaitForCompletion(ctx context.Context, client *df.Service, project, region, return errors.Wrap(err, "failed to get job") } - switch j.CurrentState { - case "JOB_STATE_DONE": - log.Info(ctx, "Job succeeded!") - return nil - - case "JOB_STATE_CANCELLED": - log.Info(ctx, "Job cancelled") + terminal, msg, err := currentStateMessage(j.CurrentState, jobID) + if err != nil { + return err + } + log.Infof(ctx, msg) + if terminal { return nil - - case "JOB_STATE_FAILED": - return errors.Errorf("job %s failed", jobID) - - case "JOB_STATE_RUNNING": - log.Info(ctx, "Job still running ...") - - default: - log.Infof(ctx, "Job state: %v ...", j.CurrentState) } time.Sleep(30 * time.Second) } } +// currentStateMessage indicates if the state is terminal, and provides a message to log, or an error. +// Errors are always terminal. +func currentStateMessage(currentState, jobID string) (bool, string, error) { + switch currentState { + // Add all Terminal Success stats here. + case "JOB_STATE_DONE", "JOB_STATE_CANCELLED", "JOB_STATE_DRAINED", "JOB_STATE_UPDATED": + var state string + switch currentState { + case "JOB_STATE_DONE": + state = "succeeded" + case "JOB_STATE_CANCELLED": + state = "cancelled" + case "JOB_STATE_DRAINED": + state = "drained" + case "JOB_STATE_UPDATED": + state = "updated" + } + return true, fmt.Sprintf("Job %v %v", jobID, state), nil + case "JOB_STATE_FAILED": + return true, "", errors.Errorf("Job %s failed", jobID) + case "JOB_STATE_RUNNING": + return false, "Job still running ...", nil + default: + return false, fmt.Sprintf("Job state: %v ...", currentState), nil + } +} + // NewClient creates a new dataflow client with default application credentials // and CloudPlatformScope. The Dataflow endpoint is optionally overridden. func NewClient(ctx context.Context, endpoint string) (*df.Service, error) { diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go index 1bf178f2700c..dd48b77212f8 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go @@ -17,6 +17,7 @@ package dataflowlib import ( "context" + "fmt" "reflect" "testing" ) @@ -157,3 +158,39 @@ func TestValidateWorkerSettings(t *testing.T) { }) } } + +func TestCurrentStateMessage(t *testing.T) { + tests := []struct { + state string + term bool + want string + wantErr error + }{ + {state: "JOB_STATE_DONE", want: "Job JorbID-09876 succeeded", term: true}, + {state: "JOB_STATE_DRAINED", want: "Job JorbID-09876 drained", term: true}, + {state: "JOB_STATE_UPDATED", want: "Job JorbID-09876 updated", term: true}, + {state: "JOB_STATE_CANCELLED", want: "Job JorbID-09876 cancelled", term: true}, + {state: "JOB_STATE_RUNNING", want: "Job still running ...", term: false}, + {state: "JOB_STATE_FAILED", wantErr: fmt.Errorf("Job JorbID-09876 failed"), term: true}, + {state: "Ossiphrage", want: "Job state: Ossiphrage ...", term: false}, + } + for _, test := range tests { + t.Run(test.state, func(t *testing.T) { + const jobID = "JorbID-09876" + term, got, err := currentStateMessage(test.state, jobID) + if term != test.term { + termGot, termWant := "false (continues)", "true (terminal)" + if !test.term { + termGot, termWant = termWant, termGot + } + t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, termGot, termWant) + } + if err != nil && err.Error() != test.wantErr.Error() { + t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, err, test.wantErr) + } + if got != test.want { + t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, got, test.want) + } + }) + } +} From 7eb76a7018e5eddd8748dda5afc7593a92ff5b85 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Thu, 19 May 2022 13:26:24 -0700 Subject: [PATCH 2/3] Update sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go Co-authored-by: Danny McCormick --- sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go index 55f8b69a0237..459c3f52a4b0 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go @@ -262,7 +262,7 @@ func currentStateMessage(currentState, jobID string) (bool, string, error) { var state string switch currentState { case "JOB_STATE_DONE": - state = "succeeded" + state = "succeeded!" case "JOB_STATE_CANCELLED": state = "cancelled" case "JOB_STATE_DRAINED": From 8724957641aa99a143646e5d606dd81ac054ecee Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Thu, 19 May 2022 13:26:30 -0700 Subject: [PATCH 3/3] Update sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go Co-authored-by: Danny McCormick --- sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go index dd48b77212f8..1a366e7bebf0 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go @@ -166,7 +166,7 @@ func TestCurrentStateMessage(t *testing.T) { want string wantErr error }{ - {state: "JOB_STATE_DONE", want: "Job JorbID-09876 succeeded", term: true}, + {state: "JOB_STATE_DONE", want: "Job JorbID-09876 succeeded!", term: true}, {state: "JOB_STATE_DRAINED", want: "Job JorbID-09876 drained", term: true}, {state: "JOB_STATE_UPDATED", want: "Job JorbID-09876 updated", term: true}, {state: "JOB_STATE_CANCELLED", want: "Job JorbID-09876 cancelled", term: true},