Skip to content

Commit

Permalink
[#23106] Add periodic.Sequence and periodic.Impulse transforms to Go …
Browse files Browse the repository at this point in the history
…SDK (#25808)

* Add periodic.Sequence and periodic.Impulse transforms

The new transforms extends support for the slowly updating side input
pattern [1] as tracked in [2].

An attempt to mirror the logic of the Python implementation [3] has been
made with minor idiomatic changes.

Java [4][5] and Python [6] have influenced the documentation and naming.

[1] https://beam.apache.org/documentation/patterns/side-inputs/
[2] #23106
[3] https://github.com/apache/beam/blob/v2.46.0/sdks/python/apache_beam/transforms/periodicsequence.py#L59
[4] https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/PeriodicSequence.html
[5] https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/transforms/PeriodicImpulse.html
[6] https://beam.apache.org/releases/pydoc/2.46.0/apache_beam.transforms.periodicsequence.html?highlight=periodicimpulse#module-apache_beam.transforms.periodicsequence

* Add licence to example

* periodic: address feedback and add unit tests

* periodic: emit bytes instead of int64

* periodic: adjust impulse argument validation

* examples/slowly_updating_side_input: fix periodic.Impulse call

* periodic: add licence to test file

* Apply suggestions from code review

Co-authored-by: Robert Burke <[email protected]>

* examples/slowly_updating_side_input: avoid nesting pipeline

* examples/slowly_updating_side_input: fix WindowInto argument order

* examples/slowly_updating_side_input: change impulse element type

* periodic: use testing.M to set the prism runner

* periodic: remove defunct Setup

* periodic: Sequence emits int64 index and Impulse emits []byte

* periodic: document start and end and add constructor

---------

Co-authored-by: Robert Burke <[email protected]>
  • Loading branch information
hnnsgstfssn and lostluck authored Mar 23, 2023
1 parent 4484c19 commit 16cd2a9
Show file tree
Hide file tree
Showing 4 changed files with 385 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
* Schema'd PTransforms can now be directly applied to Beam dataframes just like PCollections.
(Note that when doing multiple operations, it may be more efficient to explicitly chain the operations
like `df | (Transform1 | Transform2 | ...)` to avoid excessive conversions.)
* The Go SDK adds new transforms periodic.Impulse and periodic.Sequence that extends support
for slowly updating side input patterns. ([#23106](https://github.com/apache/beam/issues/23106))

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// slowly_updating_side_input is an example pipeline demonstrating the pattern described
// at https://beam.apache.org/documentation/patterns/side-inputs/.
package main

import (
"context"
"flag"
"strings"
"time"

"cloud.google.com/go/pubsub"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/pubsubx"
)

func init() {
register.Function4x0(update)
register.Function4x0(process)
register.Emitter2[int, string]()
register.Iter1[string]()
}

// update simulates an external call to get data for the side input.
func update(ctx context.Context, t beam.EventTime, _ []byte, emit func(int, string)) {
log.Infof(ctx, "Making external call at event time %s", t.ToTime().Format(time.RFC3339))

// zero is the key used in beam.AddFixedKey which will be applied on the main input.
id, externalData := 0, "some fake data that changed at "+time.Now().Format(time.RFC3339)

emit(id, externalData)
}

// process simulates processing of main input. It reads side input by key
func process(ctx context.Context, k int, v []byte, side func(int) func(*string) bool) {
log.Infof(ctx, "Processing (key:%d,value:%q)", k, v)

iter := side(k)

var externalData []string
var externalDatum string
for iter(&externalDatum) {
externalData = append(externalData, externalDatum)
}

log.Infof(ctx, "Processing (key:%d,value:%q) with external data %q", k, v, strings.Join(externalData, ","))
}

func fatalf(err error, format string, args ...interface{}) {
if err != nil {
log.Fatalf(context.TODO(), format, args...)
}
}

func main() {
var inputTopic, periodicSequenceStart, periodicSequenceEnd string
var periodicSequenceInterval time.Duration

now := time.Now()

flag.StringVar(&periodicSequenceStart, "periodic_sequence_start", now.Add(-1*time.Hour).Format(time.RFC3339),
"The time at which to start the periodic sequence.")

flag.StringVar(&periodicSequenceEnd, "periodic_sequence_end", now.Add(100*time.Hour).Format(time.RFC3339),
"The time at which to end the periodic sequence.")

flag.DurationVar(&periodicSequenceInterval, "periodic_sequence_interval", 1*time.Minute,
"The interval between periodic sequence output.")

flag.StringVar(&inputTopic, "input_topic", "input",
"The PubSub topic from which to read the main input data.")

flag.Parse()
beam.Init()
ctx := context.Background()
p, s := beam.NewPipelineWithRoot()

project := gcpopts.GetProject(ctx)
client, err := pubsub.NewClient(ctx, project)
fatalf(err, "Failed to create client: %v", err)
_, err = pubsubx.EnsureTopic(ctx, client, inputTopic)
fatalf(err, "Failed to ensure topic: %v", err)

source := pubsubio.Read(s, project, inputTopic, nil)
keyedSource := beam.AddFixedKey(s, source) // simulate keyed data by adding a fixed key
mainInput := beam.WindowInto(
s,
window.NewFixedWindows(periodicSequenceInterval),
keyedSource,
beam.Trigger(trigger.Repeat(trigger.Always())),
beam.PanesDiscard(),
)

startTime, _ := time.Parse(time.RFC3339, periodicSequenceStart)
endTime, _ := time.Parse(time.RFC3339, periodicSequenceEnd)

// Generate an impulse every period.
periodicImp := periodic.Impulse(s, startTime, endTime, periodicSequenceInterval, false)

// Use the impulse to trigger some other ordinary transform.
updatedImp := beam.ParDo(s, update, periodicImp)

// Window for use as a side input, to allow the input to change with windows.
sideInput := beam.WindowInto(s, window.NewFixedWindows(periodicSequenceInterval),
updatedImp,
beam.Trigger(trigger.Repeat(trigger.Always())),
beam.PanesDiscard(),
)

beam.ParDo0(s, process, mainInput,
beam.SideInput{
Input: sideInput,
},
)

if _, err := beam.Run(context.Background(), "dataflow", p); err != nil {
log.Exitf(ctx, "Failed to run job: %v", err)
}
}
188 changes: 188 additions & 0 deletions sdks/go/pkg/beam/transforms/periodic/periodic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package periodic contains transformations for generating periodic sequences.
package periodic

import (
"context"
"reflect"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
)

func init() {
register.DoFn5x2[context.Context, *sdf.ManualWatermarkEstimator, *sdf.LockRTracker, SequenceDefinition,
func(beam.EventTime, int64),
sdf.ProcessContinuation, error](&sequenceGenDoFn{})
register.Emitter2[beam.EventTime, int64]()
register.Function2x0(sequenceToImpulse)
register.Emitter1[[]byte]()
beam.RegisterType(reflect.TypeOf(SequenceDefinition{}))
}

// SequenceDefinition holds the configuration for generating a sequence of
// timestamped elements at an interval.
type SequenceDefinition struct {
Interval time.Duration

// Start is the number of milliseconds since the Unix epoch.
Start int64

// End is the number of milliseconds since the Unix epoch.
End int64
}

// NewSequenceDefinition creates a new [SequenceDefinition] from a start and
// end [time.Time] along with its interval [time.Duration].
func NewSequenceDefinition(start, end time.Time, interval time.Duration) SequenceDefinition {
return SequenceDefinition{
Start: start.UnixMilli(),
End: end.UnixMilli(),
Interval: interval,
}
}

type sequenceGenDoFn struct{}

func (fn *sequenceGenDoFn) CreateInitialRestriction(sd SequenceDefinition) offsetrange.Restriction {
totalOutputs := mtime.Time(sd.End).ToTime().Sub(mtime.Time(sd.Start).ToTime()) / sd.Interval
return offsetrange.Restriction{
Start: int64(0),
End: int64(totalOutputs),
}
}

func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
}

func (fn *sequenceGenDoFn) RestrictionSize(_ SequenceDefinition, rest offsetrange.Restriction) float64 {
return rest.Size()
}

func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction {
return []offsetrange.Restriction{rest}
}

// TruncateRestriction immediately truncates the entire restrication.
func (fn *sequenceGenDoFn) TruncateRestriction(_ *sdf.LockRTracker, _ SequenceDefinition) offsetrange.Restriction {
return offsetrange.Restriction{}
}

func (fn *sequenceGenDoFn) CreateWatermarkEstimator() *sdf.ManualWatermarkEstimator {
return &sdf.ManualWatermarkEstimator{}
}

func (fn *sequenceGenDoFn) ProcessElement(ctx context.Context, we *sdf.ManualWatermarkEstimator, rt *sdf.LockRTracker, sd SequenceDefinition, emit func(beam.EventTime, int64)) (sdf.ProcessContinuation, error) {
currentOutputIndex := rt.GetRestriction().(offsetrange.Restriction).Start
currentOutputTimestamp := mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex))
currentTime := time.Now()
we.UpdateWatermark(currentOutputTimestamp)
for currentOutputTimestamp.Before(currentTime) {
if rt.TryClaim(currentOutputIndex) {
emit(mtime.FromTime(currentOutputTimestamp), currentOutputIndex)
currentOutputIndex += 1
currentOutputTimestamp = mtime.Time(sd.Start).ToTime().Add(sd.Interval * time.Duration(currentOutputIndex))
currentTime = time.Now()
we.UpdateWatermark(currentOutputTimestamp)
} else if err := rt.GetError(); err != nil || rt.IsDone() {
// Stop processing on error or completion
return sdf.StopProcessing(), rt.GetError()
} else {
return sdf.ResumeProcessingIn(sd.Interval), nil
}
}

return sdf.ResumeProcessingIn(time.Until(currentOutputTimestamp)), nil
}

// Impulse is a PTransform which generates a sequence of timestamped
// elements at fixed runtime intervals. If applyWindow is specified, each
// element will be assigned to its own fixed window of interval size.
//
// The transform behaves the same as [Sequence] transform, but can be
// used as the first transform in a pipeline.
//
// The following applies to the arguments.
// - if start is a zero value [time.Time], start is set to the current time
// - if start is after end, start is set to end
// - start and end are normalized with [mtime.Normalize]
// - if interval <= 0 or interval > end.Sub(start), interval is set to end.Sub(start)
//
// The PCollection<[]byte> generated by Impulse is unbounded.
func Impulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWindow bool) beam.PCollection {
if start.IsZero() {
start = time.Now()
}
if start.After(end) {
start = end
}
start = mtime.Normalize(mtime.FromTime(start)).ToTime()
end = mtime.Normalize(mtime.FromTime(end)).ToTime()
if interval <= 0 || interval > end.Sub(start) {
interval = end.Sub(start)
}

return genImpulse(s.Scope("periodic.Impulse"), start, end, interval, applyWindow, &sequenceGenDoFn{})
}

func genImpulse(s beam.Scope, start, end time.Time, interval time.Duration, applyWindow bool, fn *sequenceGenDoFn) beam.PCollection {
sd := SequenceDefinition{Interval: interval, Start: start.UnixMilli(), End: end.UnixMilli()}
imp := beam.Create(s.Scope("ImpulseElement"), sd)
seq := genSequence(s, imp, fn)
imps := beam.ParDo(s, sequenceToImpulse, seq)
if applyWindow {
return beam.WindowInto(s.Scope("ApplyWindowing"),
window.NewFixedWindows(interval), imps)
}
return imps
}

func sequenceToImpulse(_ int64, emit func([]byte)) {
emit([]byte{})
}

// Sequence is a PTransform which generates a sequence of timestamped
// elements at fixed runtime intervals.
//
// The transform assigns each element a timestamp and will only output an
// element once the worker clock reach the output timestamp. Sequence is not
// able to guarantee that elements are output at the their exact timestamp, but
// it guarantees that elements will not be output prior to runtime timestamp.
//
// The transform will not output elements prior to the start time.
//
// Sequence receives [SequenceDefinition] elements and for each input element
// received, it will start generating output elements in the following pattern:
//
// - if element timestamp is less than current runtime then output element.
// - if element timestamp is greater than current runtime, wait until next
// element timestamp.
//
// The PCollection<int64> generated by Sequence is unbounded.
func Sequence(s beam.Scope, col beam.PCollection) beam.PCollection {
return genSequence(s.Scope("periodic.Sequence"), col, &sequenceGenDoFn{})
}

func genSequence(s beam.Scope, col beam.PCollection, fn *sequenceGenDoFn) beam.PCollection {
return beam.ParDo(s.Scope("GenSequence"), fn, col)
}
54 changes: 54 additions & 0 deletions sdks/go/pkg/beam/transforms/periodic/periodic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package periodic

import (
"os"
"testing"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
)

func TestMain(m *testing.M) {
os.Exit(ptest.MainRetWithDefault(m, "prism"))
}

func TestSequence(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
sd := SequenceDefinition{
Interval: time.Second,
Start: 0,
End: time.Minute.Milliseconds(),
}
in := beam.Create(s, sd)
out := Sequence(s, in)
passert.Count(s, out, "SecondsInMinute", 60)
ptest.RunAndValidate(t, p)
}

func TestImpulse(t *testing.T) {
p, s := beam.NewPipelineWithRoot()
interval := time.Second
start := time.Unix(0, 0)
end := start.Add(time.Minute)
out := Impulse(s, start, end, interval, false)
passert.Count(s, out, "SecondsInMinute", 60)
ptest.RunAndValidate(t, p)
}

0 comments on commit 16cd2a9

Please sign in to comment.