Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request][Go SDK]: Support Slowly Changing Side Input Pattern #23106

Open
lostluck opened this issue Sep 8, 2022 · 8 comments
Open

Comments

@lostluck
Copy link
Contributor

lostluck commented Sep 8, 2022

What would you like to happen?

Requires implementing a Periodic Impulse and Periodic Sequence transform for occasional re-reads of some data, in a large window.

Requires using those re-reads as a side input.

Requires allowing the map_windows urn: https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L296

So that the SDK can map windows from one to another, this needs to be added to the big switch in exec/translate.go.

Currently, trying to manually use this pattern leads to a panic on execution when a runner makes the mapping request:

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L734

Issue Priority

Priority: 1

Issue Component

Component: sdk-go

@manuzhang
Copy link
Contributor

set to p2 since this is a feature request

@github-actions github-actions bot added the stale label Nov 8, 2022
@camphillips22
Copy link
Contributor

How hard would it be to just add the map_windows urn support? I wrote a pipeline with a side input that isn't exactly the "slow changing side input" pattern, but the pipeline failed because map_windows isn't implemented.

For context, the pipeline was shaped approximately like:

flowchart TD

A[WindowInto] --> |"KV#lt;string, data#gt;"| B["Distinct(DropValue(col))"];
B --> C[Lookup configuration for keys];
C --> D["Map to KV#lt;string, config#gt;"];
D --> E
A --> |"KV#lt;string, data#gt;"| E["Emit new data based on config<br>ParDo(col, beam.SideInput(KV#lt;string, config#gt;)"];
Loading

@lostluck
Copy link
Contributor Author

lostluck commented Nov 17, 2022 via email

@camphillips22
Copy link
Contributor

Thanks for the feedback!

decoding the appropriate payload proto, and using that
information to configure an exec node that exists on the exec side of the
pipeline, in this case, taking an timestamp, and producing the appropriate
window for it, based on the provided windowing strategy (I assume).
Specific payload (likely a specific proto message) should be listed in the
proto next to the URN enum in beam_fn_api.proto.

I actually got this far myself, but I'm having trouble understanding how to write the node's ProcessElement. It's not clear to me what the shape of the data is here and there's not another node that's similar, so it's taking some time to understand how to write that bit. From my experimentation, the FullValue is empty, so I'm assuming that I need create the coders and evaluate the ReStream, but haven't gotten to that yet.

As far as integration tests, I also took the approach you suggested with a pipeline that looks like:

func MapWindows(s beam.Scope) {
	col := beam.ParDo(s, &makeTimestampedData{Data: []int{4, 9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
	windowed := beam.WindowInto(s, window.NewFixedWindows(3*time.Second), col)
	mean := stats.Mean(s, windowed)
	filtered := beam.ParDo(s, filterAbove, windowed, beam.SideInput{Input: mean})
	globalFiltered := beam.WindowInto(s, window.NewGlobalWindows(), filtered)
	passert.Sum(s, globalFiltered, "a", 4, 30)
}

The problem with this was that it never hit the map_windows URN. Not sure if there's some difference between streaming and batch pipelines here though. I wasn't sure how to test a streaming pipeline in a test like this (well, presumably the teststream package, but that's not supported by dataflow), so I ended up slightly modifying the streaming wordcap example to test it on dataflow. Not spending money though would be great, so I'll try to get local flink set up.

@lostluck
Copy link
Contributor Author

lostluck commented Nov 17, 2022 via email

@lostluck
Copy link
Contributor Author

I've been told that one needs to start the local Flink runner in "checkpointing" mode in order to get the desired behavior out of it for streaming things. I don't know how to do that off hand, but I'd look into how the Python Streaming Load Tests set up their flink instances.

I have also confirmed that a Streaming Splittable DoFn (one that returns ProcessContinuations) should be gracefully ending a streaming pipeline when all instances have returned a Stop, so we may have a tractable streaming testing scenario that doesn't require a forced cancellation.

I don't love that we need to have a different configuration for Flink to behave properly in streaming contexts, but that's a different problem.

@hnnsgstfssn
Copy link
Contributor

I've added a draft of the periodic.Sequence and periodic.Impulse but I'm having some issues and need some guidance around testing.

#25808 includes an example that I'm using to run this on Dataflow.

The pipeline runs and seems to correctly use the side input lookup as it receives input, however when draining it spits out the following trace.

Error message from worker: generic::unknown: process bundle failed for instruction process_bundle-5-4 using plan drain-S02-11 : panic: runtime error: index out of range [2] with length 2 Full error: while executing Process for Plan[drain-S02-11]: 2: DataSink[S[ptransform-9@localhost:12371]] Coder:W;coder-50>!IWC 3: PCollection[pcollection-32] Out:[2] 4: WindowInto[FIX[1m0s]]. Out:2 5: PCollection[pcollection-26] Out:[4] 6: ParDo[main.update] Out:[5] Sig: func(context.Context, mtime.Time, int64, func(int, string)) 7: PCollection[pcollection-22] Out:[6] 8: SDF.ProcessSizedElementsAndRestrictions[periodic.sequenceGenDoFn] UID:8 Out:[7] 9: PCollection[pcollection-12-truncate-output] Out:[8] 10: SDF.TruncateSizedRestriction[periodic.sequenceGenDoFn] UID:10 Out:[9] 1: DataSource[S[ptransform-8@localhost:12371], 0] Out:10 Coder:W;coder-38,KV;coder-42>,double;coder-45>>!GWC caused by: panic: runtime error: index out of range [2] with length 2 goroutine 44 [running]: runtime/debug.Stack() /usr/lib/go/src/runtime/debug/stack.go:24 +0x65 github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic.func1() /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58 +0xa5 panic({0xfb70a0, 0xc000137f08}) /usr/lib/go/src/runtime/panic.go:884 +0x213 github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*trInvoker).Invoke(0xc0001db0a0?, {0x11f5450?, 0xc00026b800?}, {0xf919c0?, 0xc000011728?}, 0xc0001db0a0?) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:320 +0x1b3 github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*TruncateSizedRestriction).ProcessElement(0xc00026b7c0, {0x11f5450, 0xc00026b800}, 0xc0001dae00, {0x0, 0x0, 0x0}) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/sdf.go:345 +0x118 github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*DataSource).Process(0xc000496c80, {0x11f5450, 0xc00026b800}) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/datasource.go:189 +0x510 github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute.func2({0x11f5450?, 0xc00026b800?}) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:131 +0x42 github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic({0x11f5450?, 0xc00026b800?}, 0x0?) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:62 +0x6c github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc0000d3c20, {0x11f5450, 0xc00026b800}, {0xc000137ae8, 0x12}, {{0x11ed500?, 0xc0002a0ae0?}, {0x120a9f8?, 0xc0002a0b10?}}) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:130 +0x3da github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc0004fc000, {0x11f5338, 0xc0004d91a0}, 0xc00009a3c0) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:407 +0xab7 github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.Main.func4({0x11f5338, 0xc0004d91a0}, 0xc00009a3c0) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:193 +0x19d created by github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.Main /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:212 +0xfed

I'm now beyond my understanding of the SDK, but it seems it fails here somehow thinking that it's a KV pair that's coming in (?), judging from the caller here.

I'm not sure what's going on and would appreciate some guidance on addressing this. Perhaps I'm just missing something simple in the implementation?

@hnnsgstfssn
Copy link
Contributor

Actually, I've run the failing pipeline off Beam 2.47.0.dev, and now testing it on Beam 2.45.0 it drains successfully. Draining on 2.46.0 fails. Could it be a regression?

lostluck added a commit that referenced this issue Mar 23, 2023
…SDK (#25808)

* Add periodic.Sequence and periodic.Impulse transforms

The new transforms extends support for the slowly updating side input
pattern [1] as tracked in [2].

An attempt to mirror the logic of the Python implementation [3] has been
made with minor idiomatic changes.

Java [4][5] and Python [6] have influenced the documentation and naming.

[1] https://beam.apache.org/documentation/patterns/side-inputs/
[2] #23106
[3] https://github.com/apache/beam/blob/v2.46.0/sdks/python/apache_beam/transforms/periodicsequence.py#L59
[4] https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/PeriodicSequence.html
[5] https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/PeriodicImpulse.html
[6] https://beam.apache.org/releases/pydoc/2.46.0/apache_beam.transforms.periodicsequence.html?highlight=periodicimpulse#module-apache_beam.transforms.periodicsequence

* Add licence to example

* periodic: address feedback and add unit tests

* periodic: emit bytes instead of int64

* periodic: adjust impulse argument validation

* examples/slowly_updating_side_input: fix periodic.Impulse call

* periodic: add licence to test file

* Apply suggestions from code review

Co-authored-by: Robert Burke <[email protected]>

* examples/slowly_updating_side_input: avoid nesting pipeline

* examples/slowly_updating_side_input: fix WindowInto argument order

* examples/slowly_updating_side_input: change impulse element type

* periodic: use testing.M to set the prism runner

* periodic: remove defunct Setup

* periodic: Sequence emits int64 index and Impulse emits []byte

* periodic: document start and end and add constructor

---------

Co-authored-by: Robert Burke <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants