Skip to content

Commit

Permalink
[apache#22737] Fit & Finish for Go SDK timer support. (apache#26782)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored and cushon committed May 24, 2024
1 parent e9359d0 commit 3cbe910
Show file tree
Hide file tree
Showing 14 changed files with 1,141 additions and 210 deletions.
104 changes: 63 additions & 41 deletions sdks/go/examples/timer_wordcap/wordcap.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@
package main

import (
"bytes"
"context"
"encoding/binary"
"flag"
"fmt"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
Expand Down Expand Up @@ -62,21 +59,7 @@ func NewStateful() *Stateful {
}
}

func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key, timerKey, timerTag string, emit func(string, string)) {
switch timerKey {
case "outputState":
log.Infof(ctx, "Timer outputState fired on stateful for element: %v.", key)
s.OutputState.Set(tp, ts.ToTime().Add(5*time.Second), timers.WithTag("1"))
switch timerTag {
case "1":
s.OutputState.Clear(tp)
log.Infof(ctx, "Timer with tag 1 fired on outputState stateful DoFn.")
emit(timerKey, timerTag)
}
}
}

func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error {
func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, _ func(beam.EventTime, string, string)) error {
s.ElementBag.Add(sp, word)
s.MinTime.Add(sp, int64(ts))

Expand All @@ -85,23 +68,77 @@ func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp sta
return err
}
if !ok {
toFire = int64(mtime.Now().Add(1 * time.Minute))
toFire = int64(time.Now().Add(30 * time.Second).UnixMilli())
}
minTime, _, err := s.MinTime.Read(sp)
if err != nil {
return err
}

s.OutputState.Set(tp, time.UnixMilli(toFire), timers.WithOutputTimestamp(time.UnixMilli(minTime)), timers.WithTag(word))
s.OutputState.Set(tp, time.UnixMilli(toFire), timers.WithOutputTimestamp(time.UnixMilli(minTime)))
// A timer can be set with independent to fire with independant string tags.
s.OutputState.Set(tp, time.UnixMilli(toFire), timers.WithTag(word), timers.WithOutputTimestamp(time.UnixMilli(minTime)))
s.TimerTime.Write(sp, toFire)

return nil
}

func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key string, timer timers.Context, emit func(beam.EventTime, string, string)) {
log.Infof(ctx, "Timer fired for key %q, for family %q and tag %q", key, timer.Family, timer.Tag)

const tag = "emit" // Tags can be arbitrary strings, but we're associating behavior with this tag in this method.

// Check which timer has fired.
switch timer.Family {
case s.OutputState.Family:
switch timer.Tag {
case "":
// Timers can be set within the OnTimer method.
// In this case the emit tag timer to fire in 5 seconds.
s.OutputState.Set(tp, ts.ToTime().Add(5*time.Second), timers.WithTag(tag))
case tag:
// When the emit tag fires, read the batched data.
es, ok, err := s.ElementBag.Read(sp)
if err != nil {
log.Errorf(ctx, "error reading ElementBag: %v", err)
return
}
if !ok {
log.Infof(ctx, "No elements in bag.")
return
}
minTime, _, err := s.MinTime.Read(sp)
if err != nil {
log.Errorf(ctx, "error reading ElementBag: %v", err)
return
}
log.Infof(ctx, "Emitting %d elements", len(es))
for _, word := range es {
emit(beam.EventTime(minTime), key, word)
}
// Clean up the state that has been evicted.
s.ElementBag.Clear(sp)
s.MinTime.Clear(sp)
s.OutputState.ClearTag(tp, tag) // Clean up the fired timer tag. (Temporary workaround for a runner bug.)
}
}
}

func init() {
register.DoFn7x1[context.Context, beam.EventTime, state.Provider, timers.Provider, string, string, func(string, string), error](&Stateful{})
register.Emitter2[string, string]()
register.DoFn7x1[context.Context, beam.EventTime, state.Provider, timers.Provider, string, string, func(beam.EventTime, string, string), error](&Stateful{})
register.Emitter3[beam.EventTime, string, string]()
register.Emitter2[beam.EventTime, int64]()
register.Function1x2(toKeyedString)
}

func generateSequence(s beam.Scope, now time.Time, duration, interval time.Duration) beam.PCollection {
s = s.Scope("generateSequence")
def := beam.Create(s, periodic.NewSequenceDefinition(now, now.Add(duration), interval))
seq := periodic.Sequence(s, def)
return seq
}

func toKeyedString(b int64) (string, string) {
return "test", fmt.Sprintf("%03d", b)
}

func main() {
Expand All @@ -110,26 +147,11 @@ func main() {

ctx := context.Background()

p := beam.NewPipeline()
s := p.Root()

out := periodic.Impulse(s, time.Now(), time.Now().Add(5*time.Minute), 5*time.Second, true)

intOut := beam.ParDo(s, func(b []byte) int64 {
var val int64
buf := bytes.NewReader(b)
binary.Read(buf, binary.BigEndian, &val)
return val
}, out)

str := beam.ParDo(s, func(b int64) string {
return fmt.Sprintf("%03d", b)
}, intOut)
p, s := beam.NewPipelineWithRoot()

keyed := beam.ParDo(s, func(ctx context.Context, ts beam.EventTime, s string) (string, string) {
return "test", s
}, str)
out := generateSequence(s, time.Now(), 1*time.Minute, 5*time.Second)

keyed := beam.ParDo(s, toKeyedString, out)
timed := beam.ParDo(s, NewStateful(), keyed)
debug.Printf(s, "post stateful: %v", timed)

Expand Down
1 change: 0 additions & 1 deletion sdks/go/pkg/beam/core/graph/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ type MultiEdge struct {
DoFn *DoFn // ParDo
RestrictionCoder *coder.Coder // SplittableParDo
StateCoders map[string]*coder.Coder // Stateful ParDo
TimerCoders *coder.Coder // Stateful ParDo
CombineFn *CombineFn // Combine
AccumCoder *coder.Coder // Combine
Value []byte // Impulse
Expand Down
33 changes: 22 additions & 11 deletions sdks/go/pkg/beam/core/runtime/exec/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,8 +1228,7 @@ func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t typex.Eve
if err := enc.Encode(ws, w); err != nil {
return err
}
err := coder.EncodePane(p, w)
return err
return coder.EncodePane(p, w)
}

// DecodeWindowedValueHeader deserializes a windowed value header.
Expand Down Expand Up @@ -1257,49 +1256,61 @@ func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window,
}

// encodeTimer encodes a TimerRecv into a byte stream.
// Avoids partial writes to provided writer on encoding errors.
func encodeTimer(elm ElementEncoder, win WindowEncoder, tm TimerRecv, w io.Writer) error {
var b bytes.Buffer
err := elm.Encode(tm.Key, &b)
if err != nil {
return errors.WithContext(err, "error encoding key")
}

if err := coder.EncodeStringUTF8(tm.Tag, &b); err != nil {
if err := encodeTimerSuffix(win, tm, &b); err != nil {
return err
}
w.Write(b.Bytes())

return nil
}

// encodeTimerSuffix enccodes the timer directly to the provided writer.
func encodeTimerSuffix(win WindowEncoder, tm TimerRecv, w io.Writer) error {
if err := coder.EncodeStringUTF8(tm.Tag, w); err != nil {
return errors.WithContext(err, "error encoding tag")
}

if err := win.Encode(tm.Windows, &b); err != nil {
if err := win.Encode(tm.Windows, w); err != nil {
return errors.WithContext(err, "error encoding window")
}

if err := coder.EncodeBool(tm.Clear, &b); err != nil {
if err := coder.EncodeBool(tm.Clear, w); err != nil {
return errors.WithContext(err, "error encoding clear bit")
}

if !tm.Clear {
if err := coder.EncodeEventTime(tm.FireTimestamp, &b); err != nil {
if err := coder.EncodeEventTime(tm.FireTimestamp, w); err != nil {
return errors.WithContext(err, "error encoding fire timestamp")
}
if err := coder.EncodeEventTime(tm.HoldTimestamp, &b); err != nil {
if err := coder.EncodeEventTime(tm.HoldTimestamp, w); err != nil {
return errors.WithContext(err, "error encoding hold timestamp")
}
if err := coder.EncodePane(tm.Pane, &b); err != nil {
if err := coder.EncodePane(tm.Pane, w); err != nil {
return errors.WithContext(err, "error encoding paneinfo")
}
}
w.Write(b.Bytes())

return nil
}

// decodeTimer decodes timer byte encoded with standard timer coder spec.
func decodeTimer(dec ElementDecoder, win WindowDecoder, r io.Reader) (TimerRecv, error) {
tm := TimerRecv{}
key, err := dec.Decode(r)
var keyBuf bytes.Buffer
tr := io.TeeReader(r, &keyBuf)
key, err := dec.Decode(tr)
if err != nil {
return tm, errors.WithContext(err, "error decoding key")
}
tm.Key = key
tm.KeyString = keyBuf.String()

s, err := coder.DecodeStringUTF8(r)
if err != nil && err != io.EOF {
Expand Down
6 changes: 2 additions & 4 deletions sdks/go/pkg/beam/core/runtime/exec/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,8 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
}
},
func(bcr *byteCountReader, ptransformID, timerFamilyID string) error {

if fn, ok := n.OnTimerTransforms[ptransformID].Fn.OnTimerFn(); ok {
_, err := n.OnTimerTransforms[ptransformID].InvokeTimerFn(ctx, fn, timerFamilyID, bcr)
if err != nil {
if node, ok := n.OnTimerTransforms[ptransformID]; ok {
if err := node.ProcessTimers(timerFamilyID, bcr); err != nil {
log.Warnf(ctx, "expected transform %v to have an OnTimer method attached to handle"+
"Timer Family ID: %v callback, but it did not. Please file an issue with Apache Beam"+
"if you have defined OnTimer method with reproducible code at https://github.com/apache/beam/issues", ptransformID, timerFamilyID)
Expand Down
23 changes: 22 additions & 1 deletion sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package exec

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -1020,6 +1021,8 @@ func runOnRoots(ctx context.Context, t *testing.T, p *Plan, name string, mthd fu

type TestDataManager struct {
Ch chan Elements

TimerWrites map[string]*bytes.Buffer
}

func (dm *TestDataManager) OpenElementChan(ctx context.Context, id StreamID, expectedTimerTransforms []string) (<-chan Elements, error) {
Expand All @@ -1031,9 +1034,27 @@ func (dm *TestDataManager) OpenWrite(ctx context.Context, id StreamID) (io.Write
}

func (dm *TestDataManager) OpenTimerWrite(ctx context.Context, id StreamID, family string) (io.WriteCloser, error) {
return nil, nil
if dm.TimerWrites == nil {
dm.TimerWrites = map[string]*bytes.Buffer{}
}
buf, ok := dm.TimerWrites[family]
if !ok {
buf = &bytes.Buffer{}
dm.TimerWrites[family] = buf
}
return struct {
*bytes.Buffer
io.Closer
}{
Buffer: buf,
Closer: noopCloser{},
}, nil
}

type noopCloser struct{}

func (noopCloser) Close() error { return nil }

type chanWriter struct {
Ch chan Elements
Buf []byte
Expand Down
8 changes: 2 additions & 6 deletions sdks/go/pkg/beam/core/runtime/exec/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type InvokeOpts struct {
we sdf.WatermarkEstimator
sa UserStateAdapter
sr StateReader
ta UserTimerAdapter
ta *userTimerAdapter
tm DataManager
extra []any
}
Expand Down Expand Up @@ -242,11 +242,7 @@ func (n *invoker) invokeWithOpts(ctx context.Context, pn typex.PaneInfo, ws []ty
}

if n.tpIdx >= 0 {
tp, err := opts.ta.NewTimerProvider(ctx, opts.tm, ts, ws, opts.opt)
if err != nil {
return nil, err
}
n.tp = &tp
n.tp = opts.ta.NewTimerProvider(pn, ws)
args[n.tpIdx] = n.tp
}

Expand Down
Loading

0 comments on commit 3cbe910

Please sign in to comment.