Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#24789] Make Prism the default Go SDK runner. #27703

Merged
merged 6 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/go/examples/large_wordcount/large_wordcount.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import (
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dot"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
Expand Down
6 changes: 3 additions & 3 deletions sdks/go/examples/minimal_wordcount/minimal_wordcount.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import (

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"

_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
Expand Down Expand Up @@ -119,6 +119,6 @@ func main() {
// formatted strings) to a text file.
textio.Write(s, "wordcounts.txt", formatted)

// Run the pipeline on the direct runner.
direct.Execute(context.Background(), p)
// Run the pipeline on the prism runner.
prism.Execute(context.Background(), p)
}
2 changes: 1 addition & 1 deletion sdks/go/examples/snippets/10metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func queryMetrics(pr beam.PipelineResult, ns, n string) metrics.QueryResults {

// [END metrics_query]

var runner = "direct"
var runner = "prism"

// [START metrics_pipeline]

Expand Down
4 changes: 0 additions & 4 deletions sdks/go/pkg/beam/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
)

// TODO(herohde) 7/6/2017: do we want to make the selected runner visible to
// transformations? That would allow runner-dependent operations or
// verification, but require that it is stored in Init and used for Run.

var (
runners = make(map[string]func(ctx context.Context, p *Pipeline) (PipelineResult, error))
)
Expand Down
3 changes: 3 additions & 0 deletions sdks/go/pkg/beam/runners/direct/direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

// Package direct contains the direct runner for running single-bundle
// pipelines in the current process. Useful for testing.
//
// Deprecated: Use prism as a local runner instead.
// Reliance on the direct runner leads to non-portable pipelines.
package direct

import (
Expand Down
20 changes: 10 additions & 10 deletions sdks/go/pkg/beam/runners/prism/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ single machine use.

For Go SDK users:
- `import "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"`
- Short term: set runner to "prism" to use it, or invoke directly.
- Medium term: switch the default from "direct" to "prism".
- Short term: set runner to "prism" to use it, or invoke directly. ☑
- Medium term: switch the default from "direct" to "prism". ☑
- Long term: alias "direct" to "prism", and delete legacy Go direct runner.

Prisms allow breaking apart and separating a beam of light into
Expand Down Expand Up @@ -118,7 +118,7 @@ can have features selectively disabled to ensure

## Current Limitations

* Experimental and testing use only.
* Testing use only.
* Executing docker containers isn't yet implemented.
* This precludes running the Java and Python SDKs, or their transforms for Cross Language.
* Loopback execution only.
Expand All @@ -127,7 +127,6 @@ can have features selectively disabled to ensure
* Not yet suitable for larger jobs, which may have intermediate data that exceeds memory bounds.
* Doesn't yet support sufficient intermediate data garbage collection for indefinite stream processing.
* Doesn't yet execute all beam pipeline features.
* No UI for job status inspection.

## Implemented so far.

Expand All @@ -140,18 +139,24 @@ can have features selectively disabled to ensure
* Global Window
* Interval Windowing
* Session Windows.
* CoGBKs
* Combines lifted and unlifted.
* Expands Splittable DoFns
* Process Continuations (AKA Streaming transform support)
* Limited support for Process Continuations
* Residuals are rescheduled for execution immeadiately.
* The transform must be finite (and eventually return a stop process continuation)
* Basic Metrics support
* Stand alone execution support
* Web UI available when run as a standalone command.
* Progess tracking
* Channel Splitting
* Dynamic Splitting

## Next feature short list (unordered)

See https://github.com/apache/beam/issues/24789 for current status.

* Resolve watermark advancement for Process Continuations
* Test Stream
* Triggers & Complex Windowing Strategy execution.
* State
Expand All @@ -162,11 +167,6 @@ See https://github.com/apache/beam/issues/24789 for current status.
* FnAPI Optimizations
* Fusion
* Data with ProcessBundleRequest & Response
* Progess tracking
* Channel Splitting
* Dynamic Splitting
* Stand alone execution support
* UI reporting of in progress jobs

This is not a comprehensive feature set, but a set of goals to best
support users of the Go SDK in testing their pipelines.
Expand Down
9 changes: 5 additions & 4 deletions sdks/go/pkg/beam/testing/ptest/ptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners" // common runner flag.

// ptest uses the direct runner to execute pipelines by default.
// ptest uses the prism runner to execute pipelines by default.
// but includes the direct runner for legacy fallback reasons to
// support users overriding the default back to the direct runner.
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
)

// TODO(herohde) 7/10/2017: add hooks to verify counters, logs, etc.

// Create creates a pipeline and a PCollection with the given values.
func Create(values []any) (*beam.Pipeline, beam.Scope, beam.PCollection) {
p := beam.NewPipeline()
Expand Down Expand Up @@ -65,7 +66,7 @@ func CreateList2(a, b any) (*beam.Pipeline, beam.Scope, beam.PCollection, beam.P
// to function.
var (
Runner = runners.Runner
defaultRunner = "direct"
defaultRunner = "prism"
mainCalled = false
)

Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/testing/ptest/ptest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
)

func TestMain(m *testing.M) {
Main(m)
}

func TestCreate(t *testing.T) {
inputs := []any{"a", "b", "c"}
p, s, col := Create(inputs)
Expand Down
5 changes: 3 additions & 2 deletions sdks/go/pkg/beam/x/beamx/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ import (
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dot"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/samza"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/universal"
)

var (
runner = runners.Runner
defaultRunner = "direct"
defaultRunner = "prism"
)

func getRunner() string {
Expand All @@ -51,7 +52,7 @@ func getRunner() string {
}

// Run invokes beam.Run with the runner supplied by the flag "runner". It
// defaults to the direct runner, but all beam-distributed runners and textio
// defaults to the prism runner, but all beam-distributed runners and textio
// filesystems are implicitly registered.
func Run(ctx context.Context, p *beam.Pipeline) error {
_, err := beam.Run(ctx, getRunner(), p)
Expand Down
37 changes: 36 additions & 1 deletion sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,39 @@ var portableFilters = []string{
"TestSetStateClear",
}

// TODO(lostluck): set up a specific run for these.
var prismFilters = []string{
// The portable runner does not support the TestStream primitive
lostluck marked this conversation as resolved.
Show resolved Hide resolved
"TestTestStream.*",
// The trigger and pane tests uses TestStream
"TestTrigger.*",
"TestPanes",
// TODO(https://github.com/apache/beam/issues/21058): Python portable runner times out on Kafka reads.
lostluck marked this conversation as resolved.
Show resolved Hide resolved
"TestKafkaIO.*",
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
"TestBigQueryIO.*",
"TestSpannerIO.*",
// The portable runner does not support self-checkpointing
lostluck marked this conversation as resolved.
Show resolved Hide resolved
"TestCheckpointing",
// The portable runner does not support pipeline drain for SDF.
lostluck marked this conversation as resolved.
Show resolved Hide resolved
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
// OOMs currently only lead to heap dumps on Dataflow runner
"TestOomParDo",
// The portable runner does not support user state.
lostluck marked this conversation as resolved.
Show resolved Hide resolved
"TestValueState",
"TestValueStateWindowed",
"TestValueStateClear",
"TestBagState",
"TestBagStateClear",
"TestCombiningState",
"TestMapState",
"TestMapStateClear",
"TestSetState",
"TestSetStateClear",
}

var flinkFilters = []string{
// TODO(https://github.com/apache/beam/issues/20723): Flink tests timing out on reads.
"TestXLang_Combine.*",
Expand Down Expand Up @@ -249,7 +282,7 @@ var dataflowFilters = []string{
"TestCheckpointing",
// TODO(21761): This test needs to provide GCP project to expansion service.
"TestBigQueryIO_BasicWriteQueryRead",
// Can't handle the test spanner container or access a local spanner.
// Can't handle the test spanner container or access a local spanner.
"TestSpannerIO.*",
// Dataflow does not drain jobs by itself.
"TestDrain",
Expand Down Expand Up @@ -292,6 +325,8 @@ func CheckFilters(t *testing.T) {
switch runner {
case "direct", "DirectRunner":
filters = directFilters
case "prism", "PrismRunner":
filters = prismFilters
case "portable", "PortableRunner":
filters = portableFilters
case "flink", "FlinkRunner":
Expand Down