Skip to content

Commit

Permalink
chang cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Aug 3, 2023
1 parent 864c042 commit a9474db
Show file tree
Hide file tree
Showing 20 changed files with 42 additions and 145 deletions.
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/core/runtime/harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe

if err != nil {
c.failed[instID] = err
return fail(ctx, instID, "process bundle failed for instruction %v using plan %v : %v", instID, bdID, err)
return fail(ctx, instID, "ProcessBundle failed: %v", err)
}

tokens := msg.GetCacheTokens()
Expand Down Expand Up @@ -426,7 +426,7 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
c.failed[instID] = err
} else if dataError != io.EOF && dataError != nil {
// If there was an error on the data channel reads, fail this bundle
// since we may have had a short read.'
// since we may have had a short read.
c.failed[instID] = dataError
err = dataError
} else {
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, cod
wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
return makeWindowCoders(coders[wcID])
}

func getOnlyValue[K comparable, V any](in map[K]V) V {
if len(in) != 1 {
panic(fmt.Sprintf("expected single value map, had %v - %v", len(in), in))
Expand Down
2 changes: 0 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal_test
package internal_test

import (
Expand Down Expand Up @@ -41,7 +40,6 @@ import (
func initRunner(t *testing.T) {
t.Helper()
if *jobopts.Endpoint == "" {
s := jobservices.NewServer(0, internal.RunPipeline)
s := jobservices.NewServer(0, internal.RunPipeline)
*jobopts.Endpoint = s.Endpoint()
go s.Serve()
Expand Down
10 changes: 7 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipep
// TODO: Implement the windowing strategy the "backup" transforms used for Reshuffle.
// TODO: Implement a fusion break for reshuffles.

if h.config.SDKReshuffle {
panic("SDK side reshuffle not yet supported")
}

// A Reshuffle, in principle, is a no-op on the pipeline structure, WRT correctness.
// It could however affect performance, so it exists to tell the runner that this
// point in the pipeline needs a fusion break, to enable the pipeline to change it's
Expand All @@ -87,11 +91,11 @@ func (h *runner) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipep
// since the input collection and output collection types match.

// Get the input and output PCollections, there should only be 1 each.
if len(t.GetOutputs()) != 1 {
panic("Expected single putput PCollection in reshuffle: " + prototext.Format(t))
if len(t.GetInputs()) != 1 {
panic("Expected single input PCollection in reshuffle: " + prototext.Format(t))
}
if len(t.GetOutputs()) != 1 {
panic("Expected single putput PCollection in reshuffle: " + prototext.Format(t))
panic("Expected single output PCollection in reshuffle: " + prototext.Format(t))
}

inColID := getOnlyValue(t.GetInputs())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,21 +145,21 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
}

// Inspect Windowing strategies for unsupported features.
for _, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() {
for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() {
check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0))
check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY)
check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING)
if ws.GetWindowFn().GetUrn() != urns.WindowFnSession {
check("WindowingStrategy.MergeStatus", ws.GetMergeStatus(), pipepb.MergeStatus_NON_MERGING)
}
// These are used by reshuffle
// TODO have a more aware blocking for reshuffle specifically.
// check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY)
// check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW)
// // Non nil triggers should fail.
// if ws.GetTrigger().GetDefault() == nil {
// check("WindowingStrategy.Trigger", ws.GetTrigger(), &pipepb.Trigger_Default{})
// }
if !bypassedWindowingStrategies[wsID] {
check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY)
check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW)
// Non nil triggers should fail.
if ws.GetTrigger().GetDefault() == nil {
check("WindowingStrategy.Trigger", ws.GetTrigger(), &pipepb.Trigger_Default{})
}
}
}
if len(errs) > 0 {
jErr := &joinError{errs: errs}
Expand Down
1 change: 0 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ func defaultFusion(topological []string, comps *pipepb.Components) []*stage {
continue
}
cs := pcolConsumers[pcolID]
fmt.Printf("XXXXXX Fusing %v, with %v\n", tid, cs)
for _, c := range cs {
stg.transforms = append(stg.transforms, c.transform)
consumed[c.transform] = true
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func Test_preprocessor_preProcessGraph(t *testing.T) {
pre := newPreprocessor([]transformPreparer{&testPreparer{}})

gotStages := pre.preProcessGraph(test.input)
if diff := cmp.Diff(test.wantStages, gotStages, cmp.AllowUnexported(stage{}), cmp.AllowUnexported(link{}), cmpopts.EquateEmpty()); diff != "" {
if diff := cmp.Diff(test.wantStages, gotStages, cmp.AllowUnexported(stage{}, link{}), cmpopts.EquateEmpty()); diff != "" {
t.Errorf("preProcessGraph(%q) stages diff (-want,+got)\n%v", test.name, diff)
}

Expand Down
30 changes: 0 additions & 30 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,36 +378,6 @@ func buildDescriptor(stg *stage, comps *pipepb.Components, wk *worker.W) error {
return nil
}

// handleSideInputs ensures appropriate coders are available to the bundle, and prepares a function to stage the data.
func handleSideInputs(tid string, t *pipepb.PTransform, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W, replacements map[string]string) (func(b *worker.B, tid string, watermark mtime.Time), error) {
sis, err := getSideInputs(t)
if err != nil {
return nil, err
}
var prepSides []func(b *worker.B, watermark mtime.Time)

// Get WindowedValue Coders for the transform's input and output PCollections.
for local, global := range t.GetInputs() {
_, ok := sis[local]
if !ok {
continue // This is the main input.
}
if oldGlobal, ok := replacements[global]; ok {
global = oldGlobal
}
prepSide, err := handleSideInput(tid, local, global, comps, coders, wk)
if err != nil {
return nil, err
}
prepSides = append(prepSides, prepSide)
}
return func(b *worker.B, tid string, watermark mtime.Time) {
for _, prep := range prepSides {
prep(b, watermark)
}
}, nil
}

// handleSideInput returns a closure that will look up the data for a side input appropriate for the given watermark.
func handleSideInput(tid, local, global string, comps *pipepb.Components, coders map[string]*pipepb.Coder, wk *worker.W) (func(b *worker.B, watermark mtime.Time), error) {
t := comps.GetTransforms()[tid]
Expand Down
51 changes: 0 additions & 51 deletions sdks/go/pkg/beam/runners/prism/internal/stateful_test.go

This file was deleted.

16 changes: 0 additions & 16 deletions sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal_test
package internal_test

import (
Expand All @@ -22,11 +21,6 @@ import (
"sort"
"time"

"context"
"fmt"
"sort"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
Expand Down Expand Up @@ -73,16 +67,6 @@ func init() {
register.Emitter2[int64, int64]()
}

// The Test DoFns live outside of the test files to get coverage information on DoFn
// Lifecycle method execution. This inflates binary size, but ensures the runner is
// exercising the expected feature set.
//
// Once there's enough confidence in the runner, we can move these into a dedicated testing
// package along with the pipelines that use them.

// Registrations should happen in the test files, so the compiler can prune these
// when they are not in use.

func dofnEmpty(imp []byte, emit func(int64)) {
}

Expand Down
16 changes: 0 additions & 16 deletions sdks/go/pkg/beam/runners/prism/internal/web/web_test.go

This file was deleted.

10 changes: 8 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,10 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
for {
select {
case req := <-wk.InstReqs:
ctrl.Send(req)
err := ctrl.Send(req)
if err != nil {
return err
}
case <-ctrl.Context().Done():
slog.Debug("Control context canceled")
return ctrl.Context().Err()
Expand Down Expand Up @@ -323,7 +326,10 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
}()
for {
select {
case req := <-wk.DataReqs:
case req, ok := <-wk.DataReqs:
if !ok {
return nil
}
if err := data.Send(req); err != nil {
slog.LogAttrs(context.TODO(), slog.LevelDebug, "data.Send error", slog.Any("error", err))
}
Expand Down
3 changes: 2 additions & 1 deletion sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func Execute(ctx context.Context, p *pipepb.Pipeline, endpoint string, opt *JobO
bin = worker
}
} else if opt.Loopback {
// TODO, determine the canonical location for Beam temp files.
// TODO(https://github.com/apache/beam/issues/27569: determine the canonical location for Beam temp files.
// In loopback mode, the binary is unused, so we can avoid an unnecessary compile step.
f, _ := os.CreateTemp(os.TempDir(), "beamloopbackworker-*")
bin = f.Name()
} else {
Expand Down
4 changes: 0 additions & 4 deletions sdks/go/pkg/beam/testing/passert/count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)

func TestMain(m *testing.M) {
ptest.Main(m)
}

func TestCount(t *testing.T) {
var tests = []struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/testing/passert/equals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func ExampleEqualsList_mismatch() {
list := [3]string{"wrong", "inputs", "here"}

EqualsList(s, col, list)
ptest.DefaultRunner()

err := ptest.Run(p)
err = unwrapError(err)

Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/testing/passert/floats.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func init() {
register.DoFn2x1[[]byte, func(*beam.T) bool, error]((*boundsFn)(nil))
register.DoFn3x1[[]byte, func(*beam.T) bool, func(*beam.T) bool, error]((*thresholdFn)(nil))
register.Emitter1[beam.T]()
register.Iter1[beam.T]()
}

// EqualsFloat calls into TryEqualsFloat, checkong that two PCollections of non-complex
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/testing/passert/passert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)

func TestMain(m *testing.M) {
ptest.Main(m)
}

func isA(input string) bool { return input == "a" }
func isB(input string) bool { return input == "b" }
func lessThan13(input int) bool { return input < 13 }
Expand Down
3 changes: 2 additions & 1 deletion sdks/go/pkg/beam/testing/ptest/ptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners" // common runner flag.

// ptest uses the prism runner to execute pipelines by default.
// but includes the direct runner for legacy fallback reasons.
// but includes the direct runner for legacy fallback reasons to
// support users overriding the default back to the direct runner.
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
)
Expand Down
6 changes: 3 additions & 3 deletions sdks/go/pkg/beam/transforms/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func init() {
register.Function1x1(greaterThanOne)
}

func alwaysTrue(a int) bool { return true }
func alwaysFalse(a int) bool { return false }
func isOne(a int) bool { return a == 1 }
func alwaysTrue(a int) bool { return true }
func alwaysFalse(a int) bool { return false }
func isOne(a int) bool { return a == 1 }
func greaterThanOne(a int) bool { return a > 1 }

func TestInclude(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ var prismFilters = []string{

// TODO(https://github.com/apache/beam/issues/21058): Xlang ios don't yet work on prism.
"TestKafkaIO.*",
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow prism runners.
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
"TestBigQueryIO.*",
"TestSpannerIO.*",
// The prism runner does not support pipeline drain for SDF.
Expand Down

0 comments on commit a9474db

Please sign in to comment.