Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request]: Prism Support for Timer and ProcessingTime #31177

Closed
1 of 16 tasks
SamStentz opened this issue May 3, 2024 · 6 comments
Closed
1 of 16 tasks

[Feature Request]: Prism Support for Timer and ProcessingTime #31177

SamStentz opened this issue May 3, 2024 · 6 comments

Comments

@SamStentz
Copy link

What would you like to happen?

I want to be able to test timers on the prism runner in GoLang.

Failed to execute job: job failed to prepare
        	caused by:
        rpc error: code = Unknown desc = found 1 uses of features unimplemented in prism in job go-job-1-1714766720657045849:
        unsupported feature "TimerFamilySpecs.TimeDomain.Urn" set with value PROCESSING_TIME

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@SamStentz
Copy link
Author

I generally just want to be able to test timers locally with any runner, not too picky.

the file I am trying to run

package poc

import (
	"context"
	"time"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
	"github.com/verily-src/verily1/ingestion/internal/beamutil"
)

func init() {
	runtime.RegisterFunction(DoNothingTransform)
	beamutil.RegisterTypePointer[doNothingTransformFn]()
	beamutil.RegisterTypePointer[context.Context]()
}

type doNothingTransformFn struct {
	TimerCount     state.Value[int64]
	ProcessingTime timers.ProcessingTime
}

func NewDoNothingTransform() *doNothingTransformFn {
	return &doNothingTransformFn{
		TimerCount:     state.MakeValueState[int64]("timerCount"),
		ProcessingTime: timers.InProcessingTime("processingTime"),
	}
}

func (fn *doNothingTransformFn) Setup(ctx context.Context) {
}

func (fn *doNothingTransformFn) ProcessElement(ctx context.Context,
	sp state.Provider, tp timers.Provider,
	k int,
	v string,
	emit func(string)) {
	log.Infof(ctx, "ProcessElement <%d, %s>", k, v)
	// Set timer for 1 second processing time.
	tn := time.Now()
	fn.ProcessingTime.Set(tp, tn.Add(time.Second))
	err := fn.TimerCount.Write(sp, 0)
	if err != nil {
		log.Errorf(ctx, "error writing timer time: %v", err)
	}
}

func (fn *doNothingTransformFn) OnTimer(ctx context.Context,
	sp state.Provider, tp timers.Provider,
	k int, timer timers.Context,
	v string,
	emit func(string)) {
	// Read state.
	tc, _, err := fn.TimerCount.Read(sp)
	if err != nil {
		log.Errorf(ctx, "error reading timer time: %v", err)
		return
	}
	log.Infof(ctx, "OnTimer <%d, %s>: count %d", k, v, tc)
	// terminate at 5 iterations
	if tc == 5 {
		emit(v)
	}
	// Set timer for 1 second processing time.
	tn := time.Now()
	fn.ProcessingTime.Set(tp, tn.Add(time.Second))
	err = fn.TimerCount.Write(sp, tc+1)
	if err != nil {
		log.Errorf(ctx, "error writing timer time: %v", err)
	}
}

func (fn *doNothingTransformFn) Teardown(ctx context.Context) {
}

func DoNothingTransform(scope beam.Scope, in beam.PCollection) beam.PCollection {
	return beam.ParDo(scope.Scope("DoNothing"), NewDoNothingTransform(), in)
}

@SamStentz
Copy link
Author

I found this capability description, but I dont see prism here/elsewhere and have no idea if I need to file against prism somewhere else

@SamStentz
Copy link
Author

Likely duplicate of #30083

@lostluck lostluck self-assigned this May 3, 2024
@lostluck
Copy link
Contributor

lostluck commented May 3, 2024

Thank you for filing this issue!

Processing Time is presently in Progress for prism, so this is a duplicate. Getting time right turns out to be tricky, so it's taken longer than desired to complete.

You're right, follow #30083 for current progress, and #30492 for the current WIP PR for getting test execution working (before actually hooking in a real clock).

#29650 is the current umbrella tracker for Prism. That'll eventually be migrated to the capabilities page. There's a goal for the next release (2.57.0) to have prism binaries produced as part of the release, so that 2.58.0 can add easy access to Prism in the Java and Python (and typescript and swift and rust?) SDKs.

I'll close this issue once I've understand the full request and make sure your expectations and our plans are properly aligned.

@lostluck
Copy link
Contributor

Basic "test focused" ProcessingTime handling is merged in with #30492. Unfortunately due to other tasks I had to focus on, I didn't manage to get the real time clock handling in yet. But the current set up should be sufficient for "unit test" execution.

Processing time timers won't auto fail the pipeline with 2.57.0, but they also won't wait around for the real time to occur. They will be processed in the correct order with the correct restrictions though (only one scheduled per key per timer per window per tag etc...). Unbounded SplittableDoFns (with ProcessContinuations) don't currently schedule into the processing time system just yet, but that's also going to be worked on with adding "real time".

Using a TestStream as an input into the pipeline will allow control over the synthetic processing time clock, and will hopefully be sufficient to test things out with some semblance of ordering.

Triggers remain unimplemented, so ProcessingTime triggers remain a failure case for such pipelines.

@SamStentz
Copy link
Author

closing since this is completed!

@github-actions github-actions bot added this to the 2.60.0 Release milestone Sep 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants