diff --git a/CHANGES.md b/CHANGES.md index 9437d489916f..56ceb8d19ad4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,7 @@ ## New Features / Improvements * The Flink runner now supports Flink 1.16.x ([#25046](https://github.com/apache/beam/issues/25046)). +* The Go SDK adds new transforms periodic.Impulse and periodic.Sequence that extends support for slowly updating side input patterns. ([#23106](https://github.com/apache/beam/issues/23106)) ## Breaking Changes diff --git a/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go new file mode 100644 index 000000000000..1f4206d74e2c --- /dev/null +++ b/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go @@ -0,0 +1,131 @@ +package main + +import ( + "context" + "flag" + "strings" + "time" + + "cloud.google.com/go/pubsub" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic" + "github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx" +) + +func init() { + register.Function4x0(update) + register.Function4x0(process) + register.Emitter2[int, string]() + register.Iter1[string]() +} + +// update simulates an external call to get data for the side input. +func update(ctx context.Context, t beam.EventTime, i int64, emit func(int, string)) { + log.Infof(ctx, "Making external call %d at %s", i, t.ToTime().Format(time.RFC3339)) + + // zero is the key used in beam.AddFixedKey which will be applied on the main input. + id, externalData := 0, "some fake data that changed at "+time.Now().Format(time.RFC3339) + + emit(id, externalData) +} + +// process simulates processing of main input. It reads side input by key +func process(ctx context.Context, k int, v []byte, side func(int) func(*string) bool) { + log.Infof(ctx, "Processing (key:%d,value:%q)", k, v) + + iter := side(k) + + var externalData []string + var externalDatum string + for iter(&externalDatum) { + externalData = append(externalData, externalDatum) + } + + log.Infof(ctx, "Processing (key:%d,value:%q) with external data %q", k, v, strings.Join(externalData, ",")) +} + +func fatalf(err error, format string, args ...interface{}) { + if err != nil { + log.Fatalf(context.TODO(), format, args...) + } +} + +func main() { + var inputTopic, periodicSequenceStart, periodicSequenceEnd string + var periodicSequenceInterval time.Duration + + now := time.Now() + + flag.StringVar(&periodicSequenceStart, "periodic_sequence_start", now.Add(-1*time.Hour).Format(time.RFC3339), + "The time at which to start the periodic sequence.") + + flag.StringVar(&periodicSequenceEnd, "periodic_sequence_end", now.Add(100*time.Hour).Format(time.RFC3339), + "The time at which to end the periodic sequence.") + + flag.DurationVar(&periodicSequenceInterval, "periodic_sequence_interval", 1*time.Minute, + "The interval between periodic sequence output.") + + flag.StringVar(&inputTopic, "input_topic", "input", + "The PubSub topic from which to read the main input data.") + + flag.Parse() + beam.Init() + ctx := context.Background() + p, s := beam.NewPipelineWithRoot() + + project := gcpopts.GetProject(ctx) + client, err := pubsub.NewClient(ctx, project) + fatalf(err, "Failed to create client: %v", err) + _, err = pubsubx.EnsureTopic(ctx, client, inputTopic) + fatalf(err, "Failed to ensure topic: %v", err) + + mainInput := beam.WindowInto( + s, + window.NewFixedWindows(periodicSequenceInterval), + beam.AddFixedKey( // simulate keyed data by adding a fixed key + s, + pubsubio.Read( + s, + project, + inputTopic, + nil, + ), + ), + beam.Trigger(trigger.Repeat(trigger.Always())), + beam.PanesDiscard(), + ) + + startTime, _ := time.Parse(time.RFC3339, periodicSequenceStart) + endTime, _ := time.Parse(time.RFC3339, periodicSequenceEnd) + sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval), + beam.ParDo( + s, + update, + periodic.Impulse( + s, + startTime, + endTime, + periodicSequenceInterval, + ), + ), + beam.Trigger(trigger.Repeat(trigger.Always())), + beam.PanesDiscard(), + ) + + beam.ParDo0(s, process, mainInput, + beam.SideInput{ + Input: sideInput, + }, + ) + + if _, err := beam.Run(context.Background(), "dataflow", p); err != nil { + log.Exitf(ctx, "Failed to run job: %v", err) + } +} diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic.go b/sdks/go/pkg/beam/transforms/periodic/periodic.go new file mode 100644 index 000000000000..15634c4e1905 --- /dev/null +++ b/sdks/go/pkg/beam/transforms/periodic/periodic.go @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package periodic contains transformations for generating periodic sequences. +package periodic + +import ( + "context" + "fmt" + "math" + "reflect" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/apache/beam/sdks/v2/go/pkg/beam/register" +) + +func init() { + register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition, + func(beam.EventTime, int64), + sdf.ProcessContinuation, error](&sequenceGenDoFn{}) + register.Emitter2[beam.EventTime, int64]() + beam.RegisterType(reflect.TypeOf(SequenceDefinition{})) +} + +// SequenceDefinition holds the configuration for generating a sequence of +// timestamped elements at an interval. +type SequenceDefinition struct { + Interval time.Duration + Start time.Time + End time.Time +} + +type sequenceGenDoFn struct { + now func() time.Time +} + +func (fn *sequenceGenDoFn) Setup() { + if fn.now == nil { + fn.now = time.Now + } +} + +func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction { + totalOutputs := math.Ceil(float64(sd.End.Sub(sd.Start) / sd.Interval)) + return offsetrange.Restriction{ + Start: int64(0), + End: int64(totalOutputs), + } +} + +func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker { + return sdf.NewLockRTracker(offsetrange.NewTracker(rest)) +} + +func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 { + return rest.Size() +} + +func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction { + return []offsetrange.Restriction{rest} +} + +// TruncateRestriction immediately truncates the entire restrication. +func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ SequenceDefinition) offsetrange.Restriction { + return offsetrange.Restriction{} +} + +func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstimator { + return &sdf.ManualWatermarkEstimator{} +} + +func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) { + currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start + currentOutputTimestamp := sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex)) + currentTime := fn.now() + we.UpdateWatermark(currentOutputTimestamp) + for currentOutputTimestamp.Before(currentTime) { + if rt.TryClaim(currentOutputIndex) { + emit(mtime.FromTime(currentOutputTimestamp), currentOutputTimestamp.UnixMilli()) + currentOutputIndex += 1 + currentOutputTimestamp = sd.Start.Add(sd.Interval * time.Duration(currentOutputIndex)) + currentTime = fn.now() + we.UpdateWatermark(currentOutputTimestamp) + } else if err := rt.GetError(); err != nil || rt.IsDone() { + // Stop processing on error or completion + return sdf.StopProcessing(), rt.GetError() + } else { + return sdf.ResumeProcessingIn(sd.Interval), nil + } + } + + return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil +} + +type impulseConfig struct { + ApplyWindow bool + + now func() time.Time +} + +type impulseOption func(*impulseConfig) error + +// ImpulseOption is a function that configures an [Impulse] transform. +type ImpulseOption = impulseOption + +// WithApplyWindow configures the [Impulse] transform to apply a fixed window +// transform to the output PCollection. +func WithApplyWindow() ImpulseOption { + return func(o *impulseConfig) error { + o.ApplyWindow = true + return nil + } +} + +func withNowFunc(now func() time.Time) ImpulseOption { + return func(o *impulseConfig) error { + o.now = now + return nil + } +} + +// Impulse is a PTransform which generates a sequence of timestamped +// elements at fixed runtime intervals. If [WithApplyWindow] is specified, each +// element will be assigned to its own fixed window of interval size. +// +// The transform behaves the same as [Sequence] transform, but can be +// used as the first transform in a pipeline. +// +// The following applies to the arguments. +// - if interval <= 0, interval is set to [math.MaxInt64] +// - if start is a zero value [time.Time], start is set to the current time +// - if start is after end, start is set to end +// +// The PCollection generated by Impulse is unbounded and the output elements +// are the [time.UnixMilli] int64 values of the output timestamp. +func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, opts ...ImpulseOption) beam.PCollection { + if interval <= 0 { + interval = math.MaxInt64 + } + if start.IsZero() { + start = time.Now() + } + if start.After(end) { + start = end + } + + conf := impulseConfig{} + + for _, opt := range opts { + if err := opt(&conf); err != nil { + panic(fmt.Errorf("periodic.Impulse: invalid option: %v", err)) + } + } + + return genImpulse(s.Scope("periodic.Impulse"), start, end, interval, conf, &sequenceGenDoFn{now: conf.now}) +} + +func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, conf impulseConfig, fn *sequenceGenDoFn) beam.PCollection { + sd := SequenceDefinition{Interval: interval, Start: start, End: end} + imp := beam.Create(s.Scope("ImpulseElement"), sd) + col := genSequence(s, imp, fn) + if conf.ApplyWindow { + return beam.WindowInto(s.Scope("ApplyWindowing"), + window.NewFixedWindows(interval), col) + } + return col +} + +// Sequence is a PTransform which generates a sequence of timestamped +// elements at fixed runtime intervals. +// +// The transform assigns each element a timestamp and will only output an +// element once the worker clock reach the output timestamp. Sequence is not +// able to guarantee that elements are output at the their exact timestamp, but +// it guarantees that elements will not be output prior to runtime timestamp. +// +// The transform will not output elements prior to the start time. +// +// Sequence receives [SequenceDefinition] elements and for each input element +// received, it will start generating output elements in the following pattern: +// +// - if element timestamp is less than current runtime then output element. +// - if element timestamp is greater than current runtime, wait until next +// element timestamp. +// +// The PCollection generated by Sequence is unbounded and the output elements +// are the [time.UnixMilli] int64 values of the output timestamp. +func Sequence(s beam.Scope, col beam.PCollection) beam.PCollection { + return genSequence(s.Scope("periodic.Sequence"), col, &sequenceGenDoFn{}) +} + +func genSequence(s beam.Scope, col beam.PCollection, fn *sequenceGenDoFn) beam.PCollection { + return beam.ParDo(s.Scope("GenSequence"), fn, col) +}