diff --git a/sdks/go/examples/timer_wordcap/wordcap.go b/sdks/go/examples/timer_wordcap/wordcap.go index 4d1faac50396..db64c10eb4d7 100644 --- a/sdks/go/examples/timer_wordcap/wordcap.go +++ b/sdks/go/examples/timer_wordcap/wordcap.go @@ -21,15 +21,12 @@ package main import ( - "bytes" "context" - "encoding/binary" "flag" "fmt" "time" "github.com/apache/beam/sdks/v2/go/pkg/beam" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" @@ -62,21 +59,7 @@ func NewStateful() *Stateful { } } -func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key, timerKey, timerTag string, emit func(string, string)) { - switch timerKey { - case "outputState": - log.Infof(ctx, "Timer outputState fired on stateful for element: %v.", key) - s.OutputState.Set(tp, ts.ToTime().Add(5*time.Second), timers.WithTag("1")) - switch timerTag { - case "1": - s.OutputState.Clear(tp) - log.Infof(ctx, "Timer with tag 1 fired on outputState stateful DoFn.") - emit(timerKey, timerTag) - } - } -} - -func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error { +func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, _ func(beam.EventTime, string, string)) error { s.ElementBag.Add(sp, word) s.MinTime.Add(sp, int64(ts)) @@ -85,23 +68,77 @@ func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp sta return err } if !ok { - toFire = int64(mtime.Now().Add(1 * time.Minute)) + toFire = int64(time.Now().Add(30 * time.Second).UnixMilli()) } minTime, _, err := s.MinTime.Read(sp) if err != nil { return err } - s.OutputState.Set(tp, time.UnixMilli(toFire), timers.WithOutputTimestamp(time.UnixMilli(minTime)), timers.WithTag(word)) + s.OutputState.Set(tp, time.UnixMilli(toFire), timers.WithOutputTimestamp(time.UnixMilli(minTime))) + // A timer can be set with independent to fire with independant string tags. + s.OutputState.Set(tp, time.UnixMilli(toFire), timers.WithTag(word), timers.WithOutputTimestamp(time.UnixMilli(minTime))) s.TimerTime.Write(sp, toFire) - return nil } +func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key string, timer timers.Context, emit func(beam.EventTime, string, string)) { + log.Infof(ctx, "Timer fired for key %q, for family %q and tag %q", key, timer.Family, timer.Tag) + + const tag = "emit" // Tags can be arbitrary strings, but we're associating behavior with this tag in this method. + + // Check which timer has fired. + switch timer.Family { + case s.OutputState.Family: + switch timer.Tag { + case "": + // Timers can be set within the OnTimer method. + // In this case the emit tag timer to fire in 5 seconds. + s.OutputState.Set(tp, ts.ToTime().Add(5*time.Second), timers.WithTag(tag)) + case tag: + // When the emit tag fires, read the batched data. + es, ok, err := s.ElementBag.Read(sp) + if err != nil { + log.Errorf(ctx, "error reading ElementBag: %v", err) + return + } + if !ok { + log.Infof(ctx, "No elements in bag.") + return + } + minTime, _, err := s.MinTime.Read(sp) + if err != nil { + log.Errorf(ctx, "error reading ElementBag: %v", err) + return + } + log.Infof(ctx, "Emitting %d elements", len(es)) + for _, word := range es { + emit(beam.EventTime(minTime), key, word) + } + // Clean up the state that has been evicted. + s.ElementBag.Clear(sp) + s.MinTime.Clear(sp) + s.OutputState.ClearTag(tp, tag) // Clean up the fired timer tag. (Temporary workaround for a runner bug.) + } + } +} + func init() { - register.DoFn7x1[context.Context, beam.EventTime, state.Provider, timers.Provider, string, string, func(string, string), error](&Stateful{}) - register.Emitter2[string, string]() + register.DoFn7x1[context.Context, beam.EventTime, state.Provider, timers.Provider, string, string, func(beam.EventTime, string, string), error](&Stateful{}) + register.Emitter3[beam.EventTime, string, string]() register.Emitter2[beam.EventTime, int64]() + register.Function1x2(toKeyedString) +} + +func generateSequence(s beam.Scope, now time.Time, duration, interval time.Duration) beam.PCollection { + s = s.Scope("generateSequence") + def := beam.Create(s, periodic.NewSequenceDefinition(now, now.Add(duration), interval)) + seq := periodic.Sequence(s, def) + return seq +} + +func toKeyedString(b int64) (string, string) { + return "test", fmt.Sprintf("%03d", b) } func main() { @@ -110,26 +147,11 @@ func main() { ctx := context.Background() - p := beam.NewPipeline() - s := p.Root() - - out := periodic.Impulse(s, time.Now(), time.Now().Add(5*time.Minute), 5*time.Second, true) - - intOut := beam.ParDo(s, func(b []byte) int64 { - var val int64 - buf := bytes.NewReader(b) - binary.Read(buf, binary.BigEndian, &val) - return val - }, out) - - str := beam.ParDo(s, func(b int64) string { - return fmt.Sprintf("%03d", b) - }, intOut) + p, s := beam.NewPipelineWithRoot() - keyed := beam.ParDo(s, func(ctx context.Context, ts beam.EventTime, s string) (string, string) { - return "test", s - }, str) + out := generateSequence(s, time.Now(), 1*time.Minute, 5*time.Second) + keyed := beam.ParDo(s, toKeyedString, out) timed := beam.ParDo(s, NewStateful(), keyed) debug.Printf(s, "post stateful: %v", timed) diff --git a/sdks/go/pkg/beam/core/graph/edge.go b/sdks/go/pkg/beam/core/graph/edge.go index a656cc9fe61e..a9f1c8a092b0 100644 --- a/sdks/go/pkg/beam/core/graph/edge.go +++ b/sdks/go/pkg/beam/core/graph/edge.go @@ -156,7 +156,6 @@ type MultiEdge struct { DoFn *DoFn // ParDo RestrictionCoder *coder.Coder // SplittableParDo StateCoders map[string]*coder.Coder // Stateful ParDo - TimerCoders *coder.Coder // Stateful ParDo CombineFn *CombineFn // Combine AccumCoder *coder.Coder // Combine Value []byte // Impulse diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go index f2a9ff728e38..2c21ebea56b5 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/coder.go +++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go @@ -1228,8 +1228,7 @@ func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t typex.Eve if err := enc.Encode(ws, w); err != nil { return err } - err := coder.EncodePane(p, w) - return err + return coder.EncodePane(p, w) } // DecodeWindowedValueHeader deserializes a windowed value header. @@ -1257,6 +1256,7 @@ func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window, } // encodeTimer encodes a TimerRecv into a byte stream. +// Avoids partial writes to provided writer on encoding errors. func encodeTimer(elm ElementEncoder, win WindowEncoder, tm TimerRecv, w io.Writer) error { var b bytes.Buffer err := elm.Encode(tm.Key, &b) @@ -1264,42 +1264,53 @@ func encodeTimer(elm ElementEncoder, win WindowEncoder, tm TimerRecv, w io.Write return errors.WithContext(err, "error encoding key") } - if err := coder.EncodeStringUTF8(tm.Tag, &b); err != nil { + if err := encodeTimerSuffix(win, tm, &b); err != nil { + return err + } + w.Write(b.Bytes()) + + return nil +} + +// encodeTimerSuffix enccodes the timer directly to the provided writer. +func encodeTimerSuffix(win WindowEncoder, tm TimerRecv, w io.Writer) error { + if err := coder.EncodeStringUTF8(tm.Tag, w); err != nil { return errors.WithContext(err, "error encoding tag") } - if err := win.Encode(tm.Windows, &b); err != nil { + if err := win.Encode(tm.Windows, w); err != nil { return errors.WithContext(err, "error encoding window") } - if err := coder.EncodeBool(tm.Clear, &b); err != nil { + if err := coder.EncodeBool(tm.Clear, w); err != nil { return errors.WithContext(err, "error encoding clear bit") } if !tm.Clear { - if err := coder.EncodeEventTime(tm.FireTimestamp, &b); err != nil { + if err := coder.EncodeEventTime(tm.FireTimestamp, w); err != nil { return errors.WithContext(err, "error encoding fire timestamp") } - if err := coder.EncodeEventTime(tm.HoldTimestamp, &b); err != nil { + if err := coder.EncodeEventTime(tm.HoldTimestamp, w); err != nil { return errors.WithContext(err, "error encoding hold timestamp") } - if err := coder.EncodePane(tm.Pane, &b); err != nil { + if err := coder.EncodePane(tm.Pane, w); err != nil { return errors.WithContext(err, "error encoding paneinfo") } } - w.Write(b.Bytes()) - return nil } // decodeTimer decodes timer byte encoded with standard timer coder spec. func decodeTimer(dec ElementDecoder, win WindowDecoder, r io.Reader) (TimerRecv, error) { tm := TimerRecv{} - key, err := dec.Decode(r) + var keyBuf bytes.Buffer + tr := io.TeeReader(r, &keyBuf) + key, err := dec.Decode(tr) if err != nil { return tm, errors.WithContext(err, "error decoding key") } tm.Key = key + tm.KeyString = keyBuf.String() s, err := coder.DecodeStringUTF8(r) if err != nil && err != io.EOF { diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 40c88134a754..4c3192949139 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -263,10 +263,8 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) { } }, func(bcr *byteCountReader, ptransformID, timerFamilyID string) error { - - if fn, ok := n.OnTimerTransforms[ptransformID].Fn.OnTimerFn(); ok { - _, err := n.OnTimerTransforms[ptransformID].InvokeTimerFn(ctx, fn, timerFamilyID, bcr) - if err != nil { + if node, ok := n.OnTimerTransforms[ptransformID]; ok { + if err := node.ProcessTimers(timerFamilyID, bcr); err != nil { log.Warnf(ctx, "expected transform %v to have an OnTimer method attached to handle"+ "Timer Family ID: %v callback, but it did not. Please file an issue with Apache Beam"+ "if you have defined OnTimer method with reproducible code at https://github.com/apache/beam/issues", ptransformID, timerFamilyID) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go index 2367a7b292fd..ef3bc53bd267 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource_test.go @@ -16,6 +16,7 @@ package exec import ( + "bytes" "context" "fmt" "io" @@ -1020,6 +1021,8 @@ func runOnRoots(ctx context.Context, t *testing.T, p *Plan, name string, mthd fu type TestDataManager struct { Ch chan Elements + + TimerWrites map[string]*bytes.Buffer } func (dm *TestDataManager) OpenElementChan(ctx context.Context, id StreamID, expectedTimerTransforms []string) (<-chan Elements, error) { @@ -1031,9 +1034,27 @@ func (dm *TestDataManager) OpenWrite(ctx context.Context, id StreamID) (io.Write } func (dm *TestDataManager) OpenTimerWrite(ctx context.Context, id StreamID, family string) (io.WriteCloser, error) { - return nil, nil + if dm.TimerWrites == nil { + dm.TimerWrites = map[string]*bytes.Buffer{} + } + buf, ok := dm.TimerWrites[family] + if !ok { + buf = &bytes.Buffer{} + dm.TimerWrites[family] = buf + } + return struct { + *bytes.Buffer + io.Closer + }{ + Buffer: buf, + Closer: noopCloser{}, + }, nil } +type noopCloser struct{} + +func (noopCloser) Close() error { return nil } + type chanWriter struct { Ch chan Elements Buf []byte diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go index 0655b0f08028..35b7e7cb3b9a 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/fn.go +++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go @@ -87,7 +87,7 @@ type InvokeOpts struct { we sdf.WatermarkEstimator sa UserStateAdapter sr StateReader - ta UserTimerAdapter + ta *userTimerAdapter tm DataManager extra []any } @@ -242,11 +242,7 @@ func (n *invoker) invokeWithOpts(ctx context.Context, pn typex.PaneInfo, ws []ty } if n.tpIdx >= 0 { - tp, err := opts.ta.NewTimerProvider(ctx, opts.tm, ts, ws, opts.opt) - if err != nil { - return nil, err - } - n.tp = &tp + n.tp = opts.ta.NewTimerProvider(pn, ws) args[n.tpIdx] = n.tp } diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go index 652a904d16f5..212ff53b6dd8 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go @@ -17,7 +17,9 @@ package exec import ( "context" + goerrors "errors" "fmt" + "io" "path" "reflect" @@ -27,6 +29,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/v2/go/pkg/beam/util/errorx" @@ -34,12 +37,13 @@ import ( // ParDo is a DoFn executor. type ParDo struct { - UID UnitID - Fn *graph.DoFn - Inbound []*graph.Inbound - Side []SideInputAdapter - UState UserStateAdapter - Out []Node + UID UnitID + Fn *graph.DoFn + Inbound []*graph.Inbound + Side []SideInputAdapter + UState UserStateAdapter + TimerTracker *userTimerAdapter + Out []Node PID string emitters []ReusableEmitter @@ -49,7 +53,6 @@ type ParDo struct { we sdf.WatermarkEstimator onTimerInvoker *invoker - Timer UserTimerAdapter timerManager DataManager reader StateReader cache *cacheElm @@ -79,7 +82,7 @@ func (n *ParDo) ID() UnitID { // HasOnTimer returns if this ParDo wraps a DoFn that has an OnTimer method. func (n *ParDo) HasOnTimer() bool { - return n.Timer != nil + return n.TimerTracker != nil } // Up initializes this ParDo and does one-time DoFn setup. @@ -158,6 +161,7 @@ func (n *ParDo) ProcessElement(_ context.Context, elm *FullValue, values ...ReSt // a ParDo's ProcessElement functionality with their own construction of // MainInputs. func (n *ParDo) processMainInput(mainIn *MainInput) error { + n.TimerTracker.SetCurrentKey(mainIn) elm := &mainIn.Key // If the function observes windows or uses per window state, we must invoke it for each window. @@ -249,6 +253,10 @@ func (n *ParDo) FinishBundle(_ context.Context) error { if _, err := n.invokeDataFn(n.ctx, typex.NoFiringPane(), window.SingleGlobalWindow, mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil); err != nil { return n.fail(err) } + // Flush timers if any. + if err := n.TimerTracker.FlushAndReset(n.ctx, n.timerManager); err != nil { + return n.fail(err) + } n.reader = nil n.cache = nil n.timerManager = nil @@ -362,47 +370,128 @@ func (n *ParDo) invokeDataFn(ctx context.Context, pn typex.PaneInfo, ws []typex. return val, nil } -func (n *ParDo) InvokeTimerFn(ctx context.Context, fn *funcx.Fn, timerFamilyID string, bcr *byteCountReader) (*FullValue, error) { - timerAdapter, ok := n.Timer.(*userTimerAdapter) - if !ok { - return nil, fmt.Errorf("userTimerAdapter empty for ParDo: %v", n.GetPID()) +// decodeBundleTimers is a helper to decode a batch of timers for a bundle, handling the io.EOF from the reader. +func decodeBundleTimers(spec timerFamilySpec, r io.Reader) ([]TimerRecv, error) { + var bundleTimers []TimerRecv + for { + tmap, err := decodeTimer(spec.KeyDecoder, spec.WinDecoder, r) + if err != nil { + if goerrors.Is(err, io.EOF) { + break + } + return nil, errors.WithContext(err, "error decoding received timer callback") + } + bundleTimers = append(bundleTimers, tmap) } - tmap, err := decodeTimer(timerAdapter.dc, timerAdapter.wc, bcr) + return bundleTimers, nil +} + +// ProcessTimers processes all timers in firing order from the runner for a timer family ID. +// +// A timer refers to a specific combination of Key+Window + Family + Tag. They also +// have a fireing time, and a data watermark hold time. The SDK doesn't determine +// if a timer is ready to fire or not, that's up to the runner. +// +// This method fires timers in the order from the runner. During this process, the user +// code may set additional firings for one or more timers, which may overwrite orderings +// from the runner. +// +// In particular, if runner sent timer produces a new firing that is earlier than a 2nd runner sent timer, +// then it is processed before that 2nd timer. This will override any subsequent firing of the same timer, +// and as a result, must add a clear to the set of timer modifications. +func (n *ParDo) ProcessTimers(timerFamilyID string, r io.Reader) (err error) { + // Lookup actual domain for family here. + spec := n.TimerTracker.familyToSpec[timerFamilyID] + + bundleTimers, err := decodeBundleTimers(spec, r) if err != nil { - return nil, errors.WithContext(err, "error decoding received timer callback") + return err } + for _, tmap := range bundleTimers { + n.TimerTracker.SetCurrentKeyString(tmap.KeyString) + for i, w := range tmap.Windows { + ws := tmap.Windows[i : i+1] + modifications := n.TimerTracker.GetModifications(windowKeyPair{window: w, key: tmap.KeyString}) + + indexKey := sortableTimer{ + Domain: spec.Domain, + TimerMap: timers.TimerMap{ + Family: timerFamilyID, + Tag: tmap.Tag, + Clear: tmap.Clear, + FireTimestamp: tmap.FireTimestamp, + HoldTimestamp: tmap.HoldTimestamp, + }, + Pane: tmap.Pane, + } + + iter := modifications.earlierTimers[int(spec.Domain)].HeadSetIter(indexKey) + for { + insertedTimer, ok := iter() + if !ok { + break + } + // Check if this timer is still valid (no other timers have overwritten it.) + if modifications.IsModified(insertedTimer) { + continue + } + // If so, add a clear to the set for the tag + family, and process the inserted timer. + // This clear is necessary to prevent double firing the same timer. + // Timers may only have one active expiry (the last one set), and this timer is set + // to fire before the next one in the same bundle. + modifications.modified[timerKey{family: insertedTimer.Family, tag: insertedTimer.Tag}] = insertedTimer.Cleared() + + err := n.processTimer(timerFamilyID, ws, TimerRecv{ + Key: tmap.Key, + KeyString: tmap.KeyString, + Windows: ws, + TimerMap: insertedTimer.TimerMap, + Pane: insertedTimer.Pane, + }) + if err != nil { + return errors.WithContextf(err, "error processing inline timer %v", tmap) + } + } + + // If not modified by inserted timers, execute the timer from the bundle. + if !modifications.IsModified(indexKey) { + // Not modified means we can process the original timer. + err := n.processTimer(timerFamilyID, ws, tmap) + if err != nil { + return errors.WithContextf(err, "error processing timer %v", tmap) + } + } + } + } + return nil +} + +func (n *ParDo) processTimer(timerFamilyID string, singleWindow []typex.Window, tmap TimerRecv) (err error) { // Defer side input clean-up in case of panic defer func() { if postErr := n.postInvoke(); postErr != nil { err = postErr } }() - if err := n.preInvoke(ctx, tmap.Windows, tmap.HoldTimestamp); err != nil { - return nil, err + if err := n.preInvoke(n.ctx, singleWindow, tmap.HoldTimestamp); err != nil { + return err } var extra []any - extra = append(extra, timerFamilyID) - - if tmap.Tag != "" { - extra = append(extra, tmap.Tag) - } + extra = append(extra, timers.Context{Family: timerFamilyID, Tag: tmap.Tag}) extra = append(extra, n.cache.extra...) - val, err := n.onTimerInvoker.invokeWithOpts(ctx, tmap.Pane, tmap.Windows, tmap.HoldTimestamp, InvokeOpts{ + _, err = n.onTimerInvoker.invokeWithOpts(n.ctx, tmap.Pane, singleWindow, tmap.HoldTimestamp, InvokeOpts{ opt: &MainInput{Key: *tmap.Key}, bf: n.bf, we: n.we, sa: n.UState, sr: n.reader, - ta: n.Timer, + ta: n.TimerTracker, tm: n.timerManager, extra: extra, }) - if err != nil { - return nil, err - } - return val, err + return err } // invokeProcessFn handles the per element invocations @@ -416,7 +505,7 @@ func (n *ParDo) invokeProcessFn(ctx context.Context, pn typex.PaneInfo, ws []typ if err := n.preInvoke(ctx, ws, ts); err != nil { return nil, err } - val, err = n.inv.invokeWithOpts(ctx, pn, ws, ts, InvokeOpts{opt: opt, bf: n.bf, we: n.we, sa: n.UState, sr: n.reader, ta: n.Timer, tm: n.timerManager, extra: n.cache.extra}) + val, err = n.inv.invokeWithOpts(ctx, pn, ws, ts, InvokeOpts{opt: opt, bf: n.bf, we: n.we, sa: n.UState, sr: n.reader, ta: n.TimerTracker, tm: n.timerManager, extra: n.cache.extra}) if err != nil { return nil, err } diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go index b228bfd0d7e4..9c322a38165c 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go @@ -16,19 +16,24 @@ package exec import ( + "bytes" "context" "errors" "fmt" "strings" "testing" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/google/go-cmp/cmp" ) func sumFn(n int, a int, b []int, c func(*int) bool, d func() func(*int) bool, e func(int)) int { @@ -458,6 +463,127 @@ func TestProcessSingleWindow_dataLossCase(t *testing.T) { } } +// hasTimers is to do unit testing for ProcessTimers, and does not represent +// a canonical use of timers in a DoFn. Do not use as a starting point. +type hasTimers struct { + Timer timers.EventTime + + emitOnlyOnTag bool +} + +func (fn *hasTimers) ProcessElement(tp timers.Provider, k, v string, emit func(string)) { +} + +func (fn *hasTimers) OnTimer(ts typex.EventTime, tp timers.Provider, k string, timer timers.Context, emit func(string)) { + if timer.Tag == "" { + // Sets another timer, but should be sent to the runner. + fn.Timer.Set(tp, ts.ToTime().Add(-10*time.Millisecond), timers.WithTag("tag")) + } + + if (timer.Tag != "" && fn.emitOnlyOnTag) || !fn.emitOnlyOnTag { + emit(k) + } +} + +func TestProcessTimers(t *testing.T) { + tests := []struct { + name string + inputFn any + timerKeys []any + wantEmitted, wantSet []any + Coder *coder.Coder + }{ + { + name: "onTimer- different keys", + Coder: coder.NewT(coder.NewString(), coder.NewGlobalWindow()), + inputFn: &hasTimers{Timer: timers.InEventTime("testTimer"), emitOnlyOnTag: false}, + timerKeys: []any{"1", "2", "3", "4", "5"}, + wantEmitted: []any{"1", "2", "3", "4", "5"}, + }, { + name: "onTimer- same keys", + Coder: coder.NewT(coder.NewString(), coder.NewGlobalWindow()), + inputFn: &hasTimers{Timer: timers.InEventTime("testTimer"), emitOnlyOnTag: true}, + timerKeys: []any{"1", "2", "1", "3", "1", "2", "1", "1"}, + wantEmitted: []any{"1", "1", "2", "1", "1"}, // We only emit on inline firings, + wantSet: []any{"1", "2", "3"}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fn, err := graph.NewDoFn(test.inputFn) + if err != nil { + t.Fatalf("invalid function %v", err) + } + g := graph.New() + nN := g.NewNode(typex.NewKV(typex.New(reflectx.String), typex.New(reflectx.String)), window.DefaultWindowingStrategy(), true) + + edge, err := graph.NewParDo(g, g.Root(), fn, []*graph.Node{nN}, nil, nil) + if err != nil { + t.Fatalf("invalid pardo: %v", err) + } + + out := &CaptureNode{UID: 1} + + ta := newUserTimerAdapter(StreamID{}, map[string]timerFamilySpec{ + "testTimer": { + Domain: timers.EventTimeDomain, + KeyEncoder: MakeElementEncoder(test.Coder.Components[0]), + KeyDecoder: MakeElementDecoder(test.Coder.Components[0]), + WinEncoder: MakeWindowEncoder(test.Coder.Window), + WinDecoder: MakeWindowDecoder(test.Coder.Window), + }, + }) + pardo := &ParDo{UID: 2, Fn: edge.DoFn, Inbound: edge.Input, Out: []Node{out}, TimerTracker: ta} + + now := mtime.Now() + tc := MakeElementEncoder(test.Coder) + var buf bytes.Buffer + for _, v := range test.timerKeys { + timer := TimerRecv{Key: &FullValue{Elm: v}, Windows: window.SingleGlobalWindow, Pane: typex.NoFiringPane(), + TimerMap: timers.TimerMap{ + Family: "testTimer", + Clear: false, + FireTimestamp: now, + HoldTimestamp: now, + }} + if err := tc.Encode(&FullValue{Elm: timer}, &buf); err != nil { + t.Fatalf("failed to encode timer for key %v", v) + } + } + + if err := pardo.Up(context.Background()); err != nil { + t.Fatalf("pardo.Up failed: %v", err) + } + if err := out.Up(context.Background()); err != nil { + t.Fatalf("capture.Up failed: %v", err) + } + if err := pardo.StartBundle(context.Background(), "testID", DataContext{}); err != nil { + t.Fatalf("pardo.StartBundle failed: %v", err) + } + if err := pardo.ProcessTimers("testTimer", &buf); err != nil { + t.Errorf("ProcessTimers failed when it should have succeeded: %v", err) + } + if diff := cmp.Diff(test.wantEmitted, extractValues(out.Elements...)); diff != "" { + t.Errorf("ParDo.ProcessTimers diff: \n%v", diff) + } + + // Hard check modifications for expected writes. + for _, key := range test.wantSet { + ks := key.(string) + pair := windowKeyPair{key: string(append([]byte{byte(len(ks))}, []byte(ks)...)), window: window.GlobalWindow{}} + mods, ok := pardo.TimerTracker.modifications[pair] + if !ok { + t.Errorf("can't find modified timer for pair %v for lenMods: %v", pair, pardo.TimerTracker.modifications) + } + _, ok = mods.modified[timerKey{family: "testTimer", tag: "tag"}] + if !ok { + t.Errorf("can't find modified timer for pair %v for testTimer+tag", pair) + } + } + }) + } +} + func emitSumFn(n int, emit func(int)) { emit(n + 1) } diff --git a/sdks/go/pkg/beam/core/runtime/exec/timers.go b/sdks/go/pkg/beam/core/runtime/exec/timers.go index 0303efc270c3..136834389776 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/timers.go +++ b/sdks/go/pkg/beam/core/runtime/exec/timers.go @@ -16,112 +16,314 @@ package exec import ( + "bytes" + "container/heap" "context" - "fmt" "io" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" ) -// UserTimerAdapter provides a timer provider to be used for manipulating timers. -type UserTimerAdapter interface { - NewTimerProvider(ctx context.Context, manager DataManager, inputTimestamp typex.EventTime, windows []typex.Window, element *MainInput) (timerProvider, error) +type userTimerAdapter struct { + sID StreamID + familyToSpec map[string]timerFamilySpec + modifications map[windowKeyPair]*timerModifications + + currentKey any + keyEncoded bool + buf bytes.Buffer + currentKeyString string } -type userTimerAdapter struct { - sID StreamID - ec ElementEncoder - dc ElementDecoder - wc WindowDecoder +type timerFamilySpec struct { + Domain timers.TimeDomain + KeyEncoder ElementEncoder + KeyDecoder ElementDecoder + WinEncoder WindowEncoder + WinDecoder WindowDecoder } -// NewUserTimerAdapter returns a user timer adapter for the given StreamID and timer coder. -func NewUserTimerAdapter(sID StreamID, c *coder.Coder, timerCoder *coder.Coder) UserTimerAdapter { - if !coder.IsW(c) { - panic(fmt.Sprintf("expected WV coder for user timer %v: %v", sID, c)) +func newTimerFamilySpec(domain timers.TimeDomain, timerCoder *coder.Coder) timerFamilySpec { + keyCoder := timerCoder.Components[0] + return timerFamilySpec{ + Domain: domain, + KeyEncoder: MakeElementEncoder(keyCoder), + KeyDecoder: MakeElementDecoder(keyCoder), + WinEncoder: MakeWindowEncoder(timerCoder.Window), + WinDecoder: MakeWindowDecoder(timerCoder.Window), } - ec := MakeElementEncoder(timerCoder) - dc := MakeElementDecoder(coder.SkipW(c).Components[0]) - wc := MakeWindowDecoder(c.Window) - return &userTimerAdapter{sID: sID, ec: ec, wc: wc, dc: dc} } -// NewTimerProvider creates and returns a timer provider to set/clear timers. -func (u *userTimerAdapter) NewTimerProvider(ctx context.Context, manager DataManager, inputTs typex.EventTime, w []typex.Window, element *MainInput) (timerProvider, error) { - userKey := &FullValue{Elm: element.Key.Elm} - tp := timerProvider{ - ctx: ctx, - tm: manager, - userKey: userKey, - inputTimestamp: inputTs, - sID: u.sID, - window: w, - writersByFamily: make(map[string]io.Writer), - timerElementEncoder: u.ec, - keyElementDecoder: u.dc, +// newUserTimerAdapter returns a user timer adapter for the given StreamID and timer coder. +func newUserTimerAdapter(sID StreamID, familyToSpec map[string]timerFamilySpec) *userTimerAdapter { + return &userTimerAdapter{sID: sID, familyToSpec: familyToSpec} +} + +// SetCurrentKey keeps the key around so we can encoded if needed for timers. +func (u *userTimerAdapter) SetCurrentKey(mainIn *MainInput) { + if u == nil { + return } + u.currentKey = mainIn.Key.Elm + u.keyEncoded = false +} - return tp, nil +// SetCurrentKeyString is for processing timer callbacks, and avoids re-encoding the key. +func (u *userTimerAdapter) SetCurrentKeyString(key string) { + if u == nil { + return + } + u.currentKeyString = key + u.keyEncoded = true } -type timerProvider struct { - ctx context.Context - tm DataManager - sID StreamID - inputTimestamp typex.EventTime - userKey *FullValue - window []typex.Window +// GetKeyString encodes the current key with the family's encoder, and stores the string +// for later access. +func (u *userTimerAdapter) GetKeyStringAndDomain(family string) (string, timers.TimeDomain) { + if u.keyEncoded { + return u.currentKeyString, u.familyToSpec[family].Domain + } + spec := u.familyToSpec[family] - pn typex.PaneInfo + u.buf.Reset() + if err := spec.KeyEncoder.Encode(&FullValue{Elm: u.currentKey}, &u.buf); err != nil { + panic(err) + } + u.currentKeyString = u.buf.String() + u.keyEncoded = true + return u.currentKeyString, spec.Domain +} - writersByFamily map[string]io.Writer - timerElementEncoder ElementEncoder - keyElementDecoder ElementDecoder +// NewTimerProvider creates and returns a timer provider to set/clear timers. +func (u *userTimerAdapter) NewTimerProvider(pane typex.PaneInfo, w []typex.Window) *timerProvider { + return &timerProvider{ + window: w, + pane: pane, + adapter: u, + } } -func (p *timerProvider) getWriter(family string) (io.Writer, error) { - if w, ok := p.writersByFamily[family]; ok { - return w, nil +func (u *userTimerAdapter) GetModifications(key windowKeyPair) *timerModifications { + if u.modifications == nil { + u.modifications = map[windowKeyPair]*timerModifications{} } - w, err := p.tm.OpenTimerWrite(p.ctx, p.sID, family) - if err != nil { - return nil, err + mods, ok := u.modifications[key] + if !ok { + mods = &timerModifications{ + modified: map[timerKey]sortableTimer{}, + } + u.modifications[key] = mods } - p.writersByFamily[family] = w - return p.writersByFamily[family], nil + return mods +} + +// FlushAndReset writes all outstanding modified timers to the datamanager. +func (u *userTimerAdapter) FlushAndReset(ctx context.Context, manager DataManager) error { + if u == nil { + return nil + } + writersByFamily := map[string]io.Writer{} + + var b bytes.Buffer + for windowKeyPair, mods := range u.modifications { + for id, timer := range mods.modified { + spec := u.familyToSpec[id.family] + w, ok := writersByFamily[id.family] + if !ok { + var err error + w, err = manager.OpenTimerWrite(ctx, u.sID, id.family) + if err != nil { + return err + } + writersByFamily[id.family] = w + } + b.Reset() + b.Write([]byte(windowKeyPair.key)) + + if err := encodeTimerSuffix(spec.WinEncoder, TimerRecv{ + TimerMap: timer.TimerMap, + Windows: []typex.Window{windowKeyPair.window}, + Pane: timer.Pane, + }, &b); err != nil { + return errors.WithContextf(err, "error writing timer family %v, tag %v", timer.Family, timer.Tag) + } + w.Write(b.Bytes()) + } + } + + u.modifications = nil + u.currentKey = nil + u.currentKeyString = "" + u.keyEncoded = false + return nil +} + +type timerProvider struct { + window []typex.Window + pane typex.PaneInfo + + adapter *userTimerAdapter } // Set writes a new timer. This can be used to both Set as well as Clear the timer. // Note: This function is intended for internal use only. func (p *timerProvider) Set(t timers.TimerMap) { - w, err := p.getWriter(t.Family) - if err != nil { - panic(err) - } - tm := TimerRecv{ - Key: p.userKey, - Tag: t.Tag, - Windows: p.window, - Clear: t.Clear, - FireTimestamp: t.FireTimestamp, - HoldTimestamp: t.HoldTimestamp, - Pane: p.pn, - } - fv := FullValue{Elm: tm} - if err := p.timerElementEncoder.Encode(&fv, w); err != nil { - panic(err) + k, domain := p.adapter.GetKeyStringAndDomain(t.Family) + for _, w := range p.window { + modifications := p.adapter.GetModifications(windowKeyPair{w, k}) + + // Make the adapter know the domains of the families. + insertedTimer := sortableTimer{ + Domain: domain, + TimerMap: t, + Pane: p.pane, + } + modifications.InsertTimer(insertedTimer) } } // TimerRecv holds the timer metadata while encoding and decoding timers in exec unit. +// +// For SDK internal use, and subject to change. type TimerRecv struct { - Key *FullValue - Tag string - Windows []typex.Window // []typex.Window - Clear bool - FireTimestamp, HoldTimestamp mtime.Time - Pane typex.PaneInfo + Key *FullValue + KeyString string // The bytes for the key to avoid re-encoding key for lookups. + Windows []typex.Window + Pane typex.PaneInfo + timers.TimerMap // embed common information from set parameter. +} + +// timerHeap is a min heap for inserting user set timers, and allowing for inline +// processing of timers in the bundle. +type timerHeap []sortableTimer + +type sortableTimer struct { + Domain timers.TimeDomain + + timers.TimerMap + + Pane typex.PaneInfo +} + +// Cleared returns the same timer, but in cleared form. +func (st sortableTimer) Cleared() sortableTimer { + // Must be a non-pointer receiver, so this returns a copy. + st.Clear = true + st.FireTimestamp = 0 + st.HoldTimestamp = 0 + st.Pane = typex.PaneInfo{} + return st +} + +func (st sortableTimer) Less(right sortableTimer) bool { + left := st + // Sort Processing time timers before event time timers, as they tend to be more latency sensitive. + // There's also the "unspecified" timer, which is treated like an Event Time at present. + if left.Domain != right.Domain { + return left.Domain == timers.ProcessingTimeDomain && right.Domain != timers.ProcessingTimeDomain + } + + // Sort cleared timers first, so newly written fire-able timers can fire. + if left.Clear != right.Clear { + return left.Clear && !right.Clear + } + + if left.FireTimestamp != right.FireTimestamp { + return left.FireTimestamp < right.FireTimestamp + } + if left.HoldTimestamp != right.HoldTimestamp { + return left.HoldTimestamp < right.HoldTimestamp + } + + return left.Tag < right.Tag +} + +var _ heap.Interface = (*timerHeap)(nil) + +// Len satisfies the sort interface invariant. +func (h timerHeap) Len() int { return len(h) } + +// Less satisfies the sort interface invariant. +func (h timerHeap) Less(i, j int) bool { + left, right := h[i], h[j] + return left.Less(right) +} + +// Swap satisfies the sort interface invariant. +// Intended for use only by the timerHeap itself. +func (h timerHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +// Push satisfies the heap interface invariant. +// Intended for use only by the timerHeap itself. +func (h *timerHeap) Push(x any) { + // Push and Pop use pointer receivers because they modify the slice's length, + // not just its contents. + *h = append(*h, x.(sortableTimer)) +} + +// Pop satisfies the heap interface invariant. +// Intended for use only by the timerHeap itself. +func (h *timerHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// Add timers to the heap. +func (h *timerHeap) Add(timer sortableTimer) { + heap.Push(h, timer) +} + +// HeadSetIter gets an iterator for all timers sorted less than or equal to the given timer. +// The iterator function is a view over this timerheap, so changes to this heap will be +// reflected in the iterator. +// +// The iterator is not safe to be used on multiple goroutines. +func (h *timerHeap) HeadSetIter(timer sortableTimer) func() (sortableTimer, bool) { + return func() (sortableTimer, bool) { + if h.Len() > 0 && ((*h)[0].Less(timer) || (*h)[0] == timer) { + return heap.Pop(h).(sortableTimer), true + } + return sortableTimer{}, false + } +} + +type timerKey struct { + family, tag string +} + +type windowKeyPair struct { + window typex.Window + key string +} + +type timerModifications struct { + // Track timer modifications per domain. + earlierTimers [3]timerHeap + // TIMER FAMILY + TAG, have the actual updated timers. + modified map[timerKey]sortableTimer +} + +func (tm *timerModifications) InsertTimer(t sortableTimer) { + tm.modified[timerKey{ + family: t.Family, + tag: t.Tag, + }] = t + tm.earlierTimers[t.Domain].Add(t) +} + +func (tm *timerModifications) IsModified(check sortableTimer) bool { + got, ok := tm.modified[timerKey{ + family: check.Family, + tag: check.Tag, + }] + if !ok { + return false + } + return got != check } diff --git a/sdks/go/pkg/beam/core/runtime/exec/timers_test.go b/sdks/go/pkg/beam/core/runtime/exec/timers_test.go index 2b3a33de792b..9745064589b5 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/timers_test.go +++ b/sdks/go/pkg/beam/core/runtime/exec/timers_test.go @@ -17,11 +17,19 @@ package exec import ( "bytes" + "context" + "sort" "testing" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "golang.org/x/exp/maps" ) func equalTimers(a, b TimerRecv) bool { @@ -41,33 +49,39 @@ func TestTimerEncodingDecoding(t *testing.T) { { name: "all set fields", tm: TimerRecv{ - Key: &FullValue{Elm: "Basic"}, - Tag: "first", - Windows: window.SingleGlobalWindow, - Clear: false, - FireTimestamp: mtime.Now(), + Key: &FullValue{Elm: "Basic"}, + TimerMap: timers.TimerMap{ + Tag: "first", + Clear: false, + FireTimestamp: mtime.Now(), + }, + Windows: window.SingleGlobalWindow, }, result: true, }, { name: "without tag", tm: TimerRecv{ - Key: &FullValue{Elm: "Basic"}, - Tag: "", - Windows: window.SingleGlobalWindow, - Clear: false, - FireTimestamp: mtime.Now(), + Key: &FullValue{Elm: "Basic"}, + TimerMap: timers.TimerMap{ + Tag: "", + Clear: false, + FireTimestamp: mtime.Now(), + }, + Windows: window.SingleGlobalWindow, }, result: true, }, { name: "with clear set", tm: TimerRecv{ - Key: &FullValue{Elm: "Basic"}, - Tag: "first", - Windows: window.SingleGlobalWindow, - Clear: true, - FireTimestamp: mtime.Now(), + Key: &FullValue{Elm: "Basic"}, + TimerMap: timers.TimerMap{ + Tag: "first", + Clear: true, + FireTimestamp: mtime.Now(), + }, + Windows: window.SingleGlobalWindow, }, result: false, }, @@ -91,5 +105,431 @@ func TestTimerEncodingDecoding(t *testing.T) { } }) } +} + +func TestTimerAdapter(t *testing.T) { + encodedKey := string([]byte{3}) + "key" + recv := func(tmap timers.TimerMap) TimerRecv { + var pane typex.PaneInfo + if !tmap.Clear { + pane = typex.NoFiringPane() + } + return TimerRecv{ + Key: &FullValue{Elm: "key"}, + KeyString: encodedKey, + Windows: window.SingleGlobalWindow, + Pane: pane, + TimerMap: tmap, + } + } + + tests := []struct { + name string + toSet []timers.TimerMap + want map[string][]TimerRecv // family to timers without family. + }{ + { + name: "simple", + toSet: []timers.TimerMap{ + { + Family: "family1", + Tag: "", + Clear: false, + FireTimestamp: 123, + HoldTimestamp: 456, + }, { + Family: "family2", + Tag: "tag1", + Clear: false, + FireTimestamp: 123, + HoldTimestamp: 456, + }, + }, + want: map[string][]TimerRecv{ + "family1": { + recv(timers.TimerMap{ + Tag: "", + Clear: false, + FireTimestamp: 123, + HoldTimestamp: 456, + }), + }, + "family2": { + recv(timers.TimerMap{ + Tag: "tag1", + Clear: false, + FireTimestamp: 123, + HoldTimestamp: 456, + }), + }, + }, + }, { + name: "overwritten", + toSet: []timers.TimerMap{ + { + Family: "family1", + Tag: "", + Clear: false, + FireTimestamp: 123, + HoldTimestamp: 456, + }, { + Family: "family1", + Tag: "", + Clear: false, + FireTimestamp: 456, + HoldTimestamp: 789, + }, + }, + want: map[string][]TimerRecv{ + "family1": { + recv(timers.TimerMap{ + Tag: "", + Clear: false, + FireTimestamp: 456, + HoldTimestamp: 789, + }), + }, + }, + }, { + name: "cleared", + toSet: []timers.TimerMap{ + { + Family: "family1", + Tag: "", + Clear: false, + FireTimestamp: 123, + HoldTimestamp: 456, + }, { + Family: "family1", + Tag: "", + Clear: true, + }, + }, + want: map[string][]TimerRecv{ + "family1": { + recv(timers.TimerMap{ + Tag: "", + Clear: true, + }), + }, + }, + }, { + name: "tags separate", + toSet: []timers.TimerMap{ + { + Family: "family1", + Tag: "", + Clear: false, + FireTimestamp: 1, + HoldTimestamp: 2, + }, { + Family: "family1", + Tag: "tag1", + Clear: false, + FireTimestamp: 3, + HoldTimestamp: 4, + }, { + Family: "family1", + Tag: "tag2", + Clear: false, + FireTimestamp: 5, + HoldTimestamp: 6, + }, + }, + want: map[string][]TimerRecv{ + "family1": { + recv(timers.TimerMap{ + Tag: "", + Clear: false, + FireTimestamp: 1, + HoldTimestamp: 2, + }), recv(timers.TimerMap{ + Tag: "tag1", + Clear: false, + FireTimestamp: 3, + HoldTimestamp: 4, + }), recv(timers.TimerMap{ + Tag: "tag2", + Clear: false, + FireTimestamp: 5, + HoldTimestamp: 6, + }), + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + timerCoder := coder.NewT(coder.NewString(), coder.NewGlobalWindow()) + ta := newUserTimerAdapter(StreamID{PtransformID: "test"}, map[string]timerFamilySpec{ + "family1": newTimerFamilySpec(timers.EventTimeDomain, timerCoder), + "family2": newTimerFamilySpec(timers.ProcessingTimeDomain, timerCoder), + }) + + // Set the "key" + ta.SetCurrentKey(&MainInput{ + Key: FullValue{Elm: "key"}, + }) + + tp := ta.NewTimerProvider(typex.NoFiringPane(), window.SingleGlobalWindow) + for _, tmap := range test.toSet { + tp.Set(tmap) + } + + dm := &TestDataManager{} + ta.FlushAndReset(context.Background(), dm) + + if len(dm.TimerWrites) != len(test.want) { + t.Errorf("didn't receive writes for all expected families: got %v, want %v", maps.Keys(dm.TimerWrites), maps.Keys(test.want)) + } + for family, buf := range dm.TimerWrites { + r := bytes.NewBuffer(buf.Bytes()) + wantedTimers := test.want[family] + spec := ta.familyToSpec[family] + bundleTimers, err := decodeBundleTimers(spec, r) + if err != nil { + t.Fatalf("unable to decode timers for family %v: %v", family, err) + } + if diff := cmp.Diff(wantedTimers, bundleTimers, cmpopts.SortSlices( + func(a, b TimerRecv) bool { + if a.Tag != b.Tag { + return a.Tag < b.Tag + } + return a.FireTimestamp < b.FireTimestamp + }, + )); diff != "" { + t.Errorf("timer diff on family %v (-want,+got):\n%v", family, diff) + } + } + }) + } +} + +func TestSortableTimer_Less(t *testing.T) { + f := "family" + + now := mtime.FromTime(time.Now()) + + baseTimer := sortableTimer{ + Domain: timers.EventTimeDomain, + TimerMap: timers.TimerMap{ + Family: f, + Tag: "", + Clear: false, + FireTimestamp: now, + HoldTimestamp: now, + }, + } + eventTimer := baseTimer + processingTimer := baseTimer + processingTimer.Domain = timers.ProcessingTimeDomain + + clearedTimer := baseTimer + clearedTimer.Clear = true + + lesserFireTimer := baseTimer + lesserFireTimer.FireTimestamp -= 10 + greaterFireTimer := baseTimer + greaterFireTimer.FireTimestamp += 10 + + lesserHoldTimer := baseTimer + lesserHoldTimer.HoldTimestamp -= 10 + greaterHoldTimer := baseTimer + greaterHoldTimer.HoldTimestamp += 10 + + leastTagTimer := baseTimer + + lesserTagTimer := baseTimer + lesserTagTimer.Tag = "Bar" + + greaterTagTimer := baseTimer + greaterTagTimer.Tag = "Foo" + + tests := []struct { + name string + left, right sortableTimer + want bool + }{ + { + name: "equal ", + left: baseTimer, right: baseTimer, + want: false, + }, { + name: "processing time lesser", + left: processingTimer, right: eventTimer, + want: true, + }, { + name: "event time greater", + left: eventTimer, right: processingTimer, + want: false, + }, { + name: "cleared lesser", + left: clearedTimer, right: baseTimer, + want: true, + }, { + name: "uncleared greater", + left: baseTimer, right: clearedTimer, + want: false, + }, { + name: "greater firing time", + left: baseTimer, right: greaterFireTimer, + want: true, + }, { + name: "lesser firing time", + left: baseTimer, right: lesserFireTimer, + want: false, + }, { + name: "greater hold time", + left: baseTimer, right: greaterHoldTimer, + want: true, + }, { + name: "lesser hold time", + left: baseTimer, right: lesserHoldTimer, + want: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.left.Less(test.right) != test.want { + t.Errorf("(%+v).Less(%+v) not true", test.left, test.right) + } + }) + } + + t.Run("sorted", func(t *testing.T) { + wantedOrder := timerHeap{ + processingTimer, + clearedTimer, + lesserFireTimer, + lesserHoldTimer, + leastTagTimer, // equal to base + baseTimer, // basic version of everything. + eventTimer, // equal to base + lesserTagTimer, + greaterTagTimer, + greaterHoldTimer, + greaterFireTimer, + } + if sort.IsSorted(wantedOrder) { + return // success! + } + for i, v := range wantedOrder[1:] { + left, right := wantedOrder[i], v + if !left.Less(right) && left != right { + t.Errorf("%v \n%+v not < \n%+v", i, left, right) + } + } + }) + +} + +func TestTimerHeap_HeadSetIter(t *testing.T) { + f := "family" + + now := mtime.FromTime(time.Now()) + + nextTimer := sortableTimer{ + Domain: timers.EventTimeDomain, + TimerMap: timers.TimerMap{ + Family: f, + Tag: "", + Clear: false, + FireTimestamp: now, + HoldTimestamp: now, + }, + } + lesserFireTimer := nextTimer + lesserFireTimer.FireTimestamp -= 10 + greaterFireTimer := nextTimer + greaterFireTimer.FireTimestamp += 10 + tests := []struct { + name string + inserts []sortableTimer + key sortableTimer + want []sortableTimer + }{ + { + name: "empty", + inserts: nil, + key: nextTimer, + want: nil, + }, + { + name: "single-Greater", + inserts: []sortableTimer{greaterFireTimer}, + key: nextTimer, + want: nil, + }, + { + name: "single-Equal", + inserts: []sortableTimer{nextTimer}, + key: nextTimer, + want: []sortableTimer{nextTimer}, + }, + { + name: "single-Lesser", + inserts: []sortableTimer{lesserFireTimer}, + key: nextTimer, + want: []sortableTimer{lesserFireTimer}, + }, + { + name: "lessthan or equal", + inserts: []sortableTimer{lesserFireTimer, nextTimer, greaterFireTimer}, + key: nextTimer, + want: []sortableTimer{lesserFireTimer, nextTimer}, + }, + { + name: "lessthan or equal- different order", + inserts: []sortableTimer{greaterFireTimer, lesserFireTimer, nextTimer}, + key: nextTimer, + want: []sortableTimer{lesserFireTimer, nextTimer}, + }, + } + + // Test inserting everything at the same time. + for _, test := range tests { + t.Run("singlebatch_"+test.name, func(t *testing.T) { + var h timerHeap + for _, timer := range test.inserts { + h.Add(timer) + } + iter := h.HeadSetIter(test.key) + + var got []sortableTimer + for { + if v, ok := iter(); ok { + got = append(got, v) + } else { + break + } + } + + if diff := cmp.Diff(test.want, got); diff != "" { + t.Errorf("h.HeadSet(%+v) diff (-want, +got):\n%v", test.key, diff) + } + }) + } + + // Test pulling after every insert, to validate the iterator is a dynamic view that reflects changes. + for _, test := range tests { + t.Run("dynamic_"+test.name, func(t *testing.T) { + var h timerHeap + + iter := h.HeadSetIter(test.key) + var got []sortableTimer + for _, timer := range test.inserts { + h.Add(timer) + + if v, ok := iter(); ok { + got = append(got, v) + } + } + + if diff := cmp.Diff(test.want, got); diff != "" { + t.Errorf("h.HeadSet(%+v) diff (-want, +got):\n%v", test.key, diff) + } + }) + } } diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go index a8d1fdb7deee..65827d058387 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/translate.go +++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go @@ -27,6 +27,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx" v1pb "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/v1" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" @@ -592,12 +593,17 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) { if len(userTimers) > 0 { sID := StreamID{Port: Port{URL: b.desc.GetTimerApiServiceDescriptor().GetUrl()}, PtransformID: id.to} - ec, wc, err := b.makeCoderForPCollection(input[0]) - if err != nil { - return nil, err + + familyToSpec := map[string]timerFamilySpec{} + for fam, spec := range userTimers { + domain := timers.TimeDomain(spec.GetTimeDomain()) + timerCoder, err := b.coders.Coder(spec.GetTimerFamilyCoderId()) + if err != nil { + return nil, errors.WithContextf(err, "couldn't retreive coder for timer %v in DoFn %v, ID %v", fam, dofn.Name(), n.PID) + } + familyToSpec[fam] = newTimerFamilySpec(domain, timerCoder) } - timerCoder := coder.NewT(ec.Components[0], wc) - n.Timer = NewUserTimerAdapter(sID, coder.NewW(ec, wc), timerCoder) + n.TimerTracker = newUserTimerAdapter(sID, familyToSpec) } for i := 1; i < len(input); i++ { diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 6a5cb313d17e..ef46fe1e43da 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -582,15 +582,27 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) { m.requirements[URNRequiresStatefulProcessing] = true timerSpecs := make(map[string]*pipepb.TimerFamilySpec) pipelineTimers, _ := edge.Edge.DoFn.PipelineTimers() + + // All timers for a single DoFn have the same key and window coders, that match the input PCollection. + mainInputID := inputs["i0"] + pCol := m.pcollections[mainInputID] + kvCoder := m.coders.coders[pCol.CoderId] + if kvCoder.GetSpec().GetUrn() != urnKVCoder { + return nil, errors.Errorf("timer using DoFn %v doesn't use a KV as PCollection input. Unable to extract key coder for timers, got %v", edge.Name, kvCoder.GetSpec().GetUrn()) + } + keyCoderID := kvCoder.GetComponentCoderIds()[0] + + wsID := pCol.GetWindowingStrategyId() + ws := m.windowing[wsID] + windowCoderID := ws.GetWindowCoderId() + + timerCoderID := m.coders.internBuiltInCoder(urnTimerCoder, keyCoderID, windowCoderID) + for _, pt := range pipelineTimers { for timerFamilyID, timeDomain := range pt.Timers() { - coderID, err := m.coders.Add(edge.Edge.TimerCoders) - if err != nil { - return handleErr(err) - } timerSpecs[timerFamilyID] = &pipepb.TimerFamilySpec{ TimeDomain: pipepb.TimeDomain_Enum(timeDomain), - TimerFamilyCoderId: coderID, + TimerFamilyCoderId: timerCoderID, } } } @@ -958,11 +970,11 @@ func (m *marshaller) expandReshuffle(edge NamedEdge) (string, error) { if err != nil { return handleErr(err) } - coderID, err := makeWindowCoder(wfn) + windowCoder, err := makeWindowCoder(wfn) if err != nil { return handleErr(err) } - windowCoderID, err := m.coders.AddWindowCoder(coderID) + windowCoderID, err := m.coders.AddWindowCoder(windowCoder) if err != nil { return handleErr(err) } diff --git a/sdks/go/pkg/beam/core/timers/timers.go b/sdks/go/pkg/beam/core/timers/timers.go index ebffa215efa4..e05b414f5bf5 100644 --- a/sdks/go/pkg/beam/core/timers/timers.go +++ b/sdks/go/pkg/beam/core/timers/timers.go @@ -42,6 +42,8 @@ const ( ) // TimerMap holds timer information obtained from the pipeline. +// +// For SDK internal use, and subject to change. type TimerMap struct { Family string Tag string @@ -70,7 +72,15 @@ func WithOutputTimestamp(outputTimestamp time.Time) timerOptions { } } +// Context is a parameter for OnTimer methods to receive the fired Timer. +type Context struct { + Family string + Tag string +} + // Provider represents a timer provider interface. +// +// The methods are not intended for end user use, and is subject to change. type Provider interface { Set(t TimerMap) } @@ -92,7 +102,7 @@ func (et EventTime) Timers() map[string]TimeDomain { // Set sets the timer for a event-time timestamp. Calling this method repeatedly for the same key // will cause it overwrite previously set timer. -func (et *EventTime) Set(p Provider, FiringTimestamp time.Time, opts ...timerOptions) { +func (et EventTime) Set(p Provider, FiringTimestamp time.Time, opts ...timerOptions) { tc := timerConfig{} for _, opt := range opts { opt(&tc) @@ -105,10 +115,15 @@ func (et *EventTime) Set(p Provider, FiringTimestamp time.Time, opts ...timerOpt } // Clear clears this timer. -func (et *EventTime) Clear(p Provider) { +func (et EventTime) Clear(p Provider) { p.Set(TimerMap{Family: et.Family, Clear: true}) } +// ClearTag clears this timer for the given tag. +func (pt EventTime) ClearTag(p Provider, tag string) { + p.Set(TimerMap{Family: pt.Family, Clear: true, Tag: tag}) +} + // ProcessingTime represents the processing time timer. type ProcessingTime struct { Family string @@ -121,7 +136,7 @@ func (pt ProcessingTime) Timers() map[string]TimeDomain { // Set sets the timer for processing time domain. Calling this method repeatedly for the same key // will cause it overwrite previously set timer. -func (pt *ProcessingTime) Set(p Provider, FiringTimestamp time.Time, opts ...timerOptions) { +func (pt ProcessingTime) Set(p Provider, FiringTimestamp time.Time, opts ...timerOptions) { tc := timerConfig{} for _, opt := range opts { opt(&tc) @@ -139,12 +154,17 @@ func (pt ProcessingTime) Clear(p Provider) { p.Set(TimerMap{Family: pt.Family, Clear: true}) } +// ClearTag clears this timer for the given tag. +func (pt ProcessingTime) ClearTag(p Provider, tag string) { + p.Set(TimerMap{Family: pt.Family, Clear: true, Tag: tag}) +} + // InEventTime creates and returns a new EventTime timer object. -func InEventTime(Key string) EventTime { - return EventTime{Family: Key} +func InEventTime(family string) EventTime { + return EventTime{Family: family} } // InProcessingTime creates and returns a new ProcessingTime timer object. -func InProcessingTime(Key string) ProcessingTime { - return ProcessingTime{Family: Key} +func InProcessingTime(family string) ProcessingTime { + return ProcessingTime{Family: family} } diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go index 031b62f335c8..d18945834d6d 100644 --- a/sdks/go/pkg/beam/pardo.go +++ b/sdks/go/pkg/beam/pardo.go @@ -116,17 +116,6 @@ func TryParDo(s Scope, dofn any, col PCollection, opts ...Option) ([]PCollection } } - wc := inWfn.Coder() - pipelineTimers, _ := fn.PipelineTimers() - if len(pipelineTimers) > 0 { - c, err := inferCoder(typex.New(reflect.TypeOf(col.Type()))) - if err != nil { - return nil, addParDoCtx(errors.New("error infering coder from col"), s) - } - tc := coder.NewT(c, wc) - edge.TimerCoders = tc - } - var ret []PCollection for _, out := range edge.Output { c := PCollection{out.To}