diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go index 6aa0712daece..459c3f52a4b0 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go @@ -240,29 +240,46 @@ func WaitForCompletion(ctx context.Context, client *df.Service, project, region, return errors.Wrap(err, "failed to get job") } - switch j.CurrentState { - case "JOB_STATE_DONE": - log.Info(ctx, "Job succeeded!") - return nil - - case "JOB_STATE_CANCELLED": - log.Info(ctx, "Job cancelled") + terminal, msg, err := currentStateMessage(j.CurrentState, jobID) + if err != nil { + return err + } + log.Infof(ctx, msg) + if terminal { return nil - - case "JOB_STATE_FAILED": - return errors.Errorf("job %s failed", jobID) - - case "JOB_STATE_RUNNING": - log.Info(ctx, "Job still running ...") - - default: - log.Infof(ctx, "Job state: %v ...", j.CurrentState) } time.Sleep(30 * time.Second) } } +// currentStateMessage indicates if the state is terminal, and provides a message to log, or an error. +// Errors are always terminal. +func currentStateMessage(currentState, jobID string) (bool, string, error) { + switch currentState { + // Add all Terminal Success stats here. + case "JOB_STATE_DONE", "JOB_STATE_CANCELLED", "JOB_STATE_DRAINED", "JOB_STATE_UPDATED": + var state string + switch currentState { + case "JOB_STATE_DONE": + state = "succeeded!" + case "JOB_STATE_CANCELLED": + state = "cancelled" + case "JOB_STATE_DRAINED": + state = "drained" + case "JOB_STATE_UPDATED": + state = "updated" + } + return true, fmt.Sprintf("Job %v %v", jobID, state), nil + case "JOB_STATE_FAILED": + return true, "", errors.Errorf("Job %s failed", jobID) + case "JOB_STATE_RUNNING": + return false, "Job still running ...", nil + default: + return false, fmt.Sprintf("Job state: %v ...", currentState), nil + } +} + // NewClient creates a new dataflow client with default application credentials // and CloudPlatformScope. The Dataflow endpoint is optionally overridden. func NewClient(ctx context.Context, endpoint string) (*df.Service, error) { diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go index 1bf178f2700c..1a366e7bebf0 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go @@ -17,6 +17,7 @@ package dataflowlib import ( "context" + "fmt" "reflect" "testing" ) @@ -157,3 +158,39 @@ func TestValidateWorkerSettings(t *testing.T) { }) } } + +func TestCurrentStateMessage(t *testing.T) { + tests := []struct { + state string + term bool + want string + wantErr error + }{ + {state: "JOB_STATE_DONE", want: "Job JorbID-09876 succeeded!", term: true}, + {state: "JOB_STATE_DRAINED", want: "Job JorbID-09876 drained", term: true}, + {state: "JOB_STATE_UPDATED", want: "Job JorbID-09876 updated", term: true}, + {state: "JOB_STATE_CANCELLED", want: "Job JorbID-09876 cancelled", term: true}, + {state: "JOB_STATE_RUNNING", want: "Job still running ...", term: false}, + {state: "JOB_STATE_FAILED", wantErr: fmt.Errorf("Job JorbID-09876 failed"), term: true}, + {state: "Ossiphrage", want: "Job state: Ossiphrage ...", term: false}, + } + for _, test := range tests { + t.Run(test.state, func(t *testing.T) { + const jobID = "JorbID-09876" + term, got, err := currentStateMessage(test.state, jobID) + if term != test.term { + termGot, termWant := "false (continues)", "true (terminal)" + if !test.term { + termGot, termWant = termWant, termGot + } + t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, termGot, termWant) + } + if err != nil && err.Error() != test.wantErr.Error() { + t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, err, test.wantErr) + } + if got != test.want { + t.Errorf("currentStateMessage(%v, %q) = %v, want %v", test.state, jobID, got, test.want) + } + }) + } +}