Skip to content

Commit

Permalink
quick first pass
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck committed Aug 3, 2023
1 parent 51124cc commit 864c042
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 18 deletions.
2 changes: 1 addition & 1 deletion dev-support/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ RUN pip3 install distlib==0.3.1 yapf==0.29.0 pytest
###
# Install Go
###
ENV DOWNLOAD_GO_VERSION=1.20.5
ENV DOWNLOAD_GO_VERSION=1.20.6
RUN wget https://golang.org/dl/go${DOWNLOAD_GO_VERSION}.linux-amd64.tar.gz && \
tar -C /usr/local -xzf go${DOWNLOAD_GO_VERSION}.linux-amd64.tar.gz
ENV GOROOT /usr/local/go
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
}

// Inspect Windowing strategies for unsupported features.
for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() {
for _, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() {
check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0))
check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY)
check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING)
Expand Down
7 changes: 1 addition & 6 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,6 @@ progress:
progTick.Stop()
break progress // exit progress loop on close.
case <-progTick.C:
resp, err := b.Progress(wk)
if err != nil {
slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error())
break progress
}
resp, err := b.Progress(wk)
if err != nil {
slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error())
Expand Down Expand Up @@ -410,7 +405,7 @@ func handleSideInputs(tid string, t *pipepb.PTransform, comps *pipepb.Components
for _, prep := range prepSides {
prep(b, watermark)
}
}, nil
}, nil
}

// handleSideInput returns a closure that will look up the data for a side input appropriate for the given watermark.
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/transforms/stats/quantiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func init() {
beam.RegisterCoder(weightedElementType, encodeWeightedElement, decodeWeightedElement)

register.Function1x2(fixedKey)
register.Function2x1(makeWeightedElement) // TODO make prism fail faster when this is commented out.
//register.Function2x1(makeWeightedElement) // TODO make prism fail faster when this is commented out.
}

// Opts contains settings used to configure how approximate quantiles are computed.
Expand Down
15 changes: 6 additions & 9 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,25 +137,24 @@ var portableFilters = []string{
}

var prismFilters = []string{
// The portable runner does not support the TestStream primitive
// The prism runner does not support the TestStream primitive
"TestTestStream.*",
// The trigger and pane tests uses TestStream
"TestTrigger.*",
"TestPanes",
// TODO(https://github.com/apache/beam/issues/21058): Python portable runner times out on Kafka reads.

// TODO(https://github.com/apache/beam/issues/21058): Xlang ios don't yet work on prism.
"TestKafkaIO.*",
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners.
// TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow prism runners.
"TestBigQueryIO.*",
"TestSpannerIO.*",
// The portable runner does not support self-checkpointing
"TestCheckpointing",
// The portable runner does not support pipeline drain for SDF.
// The prism runner does not support pipeline drain for SDF.
"TestDrain",
// FhirIO currently only supports Dataflow runner
"TestFhirIO.*",
// OOMs currently only lead to heap dumps on Dataflow runner
"TestOomParDo",
// The portable runner does not support user state.
// The prism runner does not support user state.
"TestValueState",
"TestValueStateWindowed",
"TestValueStateClear",
Expand Down Expand Up @@ -328,8 +327,6 @@ func CheckFilters(t *testing.T) {
filters = prismFilters
case "portable", "PortableRunner":
filters = portableFilters
case "prism", "PrismRunner":
filters = prismFilters
case "flink", "FlinkRunner":
filters = flinkFilters
case "samza", "SamzaRunner":
Expand Down

0 comments on commit 864c042

Please sign in to comment.