Skip to content

Commit

Permalink
[apache#31403] Relax prism constraints to allow python wordcount to e…
Browse files Browse the repository at this point in the history
…xecute. (apache#31644)

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
2 people authored and reeba212 committed Dec 4, 2024
1 parent 3788a75 commit 04b49f1
Showing 1 changed file with 60 additions and 7 deletions.
67 changes: 60 additions & 7 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
urns.TransformCombinePerKey,
urns.TransformCombineGlobally, // Used by Java SDK
urns.TransformCombineGroupedValues, // Used by Java SDK
urns.TransformMerge, // Used directly by Python SDK if "pre-optimized"
urns.TransformPreCombine, // Used directly by Python SDK if "pre-optimized"
urns.TransformExtract, // Used directly by Python SDK if "pre-optimized"
urns.TransformAssignWindows:
// Very few expected transforms types for submitted pipelines.
// Most URNs are for the runner to communicate back to the SDK for execution.
Expand Down Expand Up @@ -165,12 +168,6 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo

check("OnWindowExpirationTimerFamily", pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now.

case "":
// Composites can often have no spec
if len(t.GetSubtransforms()) > 0 {
continue
}
fallthrough
case urns.TransformTestStream:
var testStream pipepb.TestStreamPayload
if err := proto.Unmarshal(t.GetSpec().GetPayload(), &testStream); err != nil {
Expand All @@ -179,7 +176,15 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo

t.EnvironmentId = "" // Unset the environment, to ensure it's handled prism side.
testStreamIds = append(testStreamIds, tid)

default:
// Composites can often have some unknown urn, permit those.
// Eg. The Python SDK has urns "beam:transform:generic_composite:v1", "beam:transform:pickled_python:v1", as well as the deprecated "beam:transform:read:v1",
// but they are composites. Since we don't do anything special with the high level, we simply use their internal subgraph.
if len(t.GetSubtransforms()) > 0 {
continue
}
// But if not, fail.
check("PTransform.Spec.Urn", urn+" "+t.GetUniqueName(), "<doesn't exist>")
}
}
Expand All @@ -191,7 +196,8 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
// Inspect Windowing strategies for unsupported features.
for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() {
check("WindowingStrategy.AllowedLateness", ws.GetAllowedLateness(), int64(0))
check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY)
// Both Closing behaviors are identical without additional trigger firings.
check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY, pipepb.ClosingBehavior_EMIT_ALWAYS)
check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING)
if ws.GetWindowFn().GetUrn() != urns.WindowFnSession {
check("WindowingStrategy.MergeStatus", ws.GetMergeStatus(), pipepb.MergeStatus_NON_MERGING)
Expand Down Expand Up @@ -398,3 +404,50 @@ func (s *Server) GetState(_ context.Context, req *jobpb.GetJobStateRequest) (*jo
Timestamp: timestamppb.New(j.stateTime),
}, nil
}

// DescribePipelineOptions is a no-op since it's unclear how it is to function.
// Apparently only implemented in the Python SDK.
func (s *Server) DescribePipelineOptions(context.Context, *jobpb.DescribePipelineOptionsRequest) (*jobpb.DescribePipelineOptionsResponse, error) {
return &jobpb.DescribePipelineOptionsResponse{
Options: []*jobpb.PipelineOptionDescriptor{},
}, nil
}

// GetStateStream returns the job state as it changes.
func (s *Server) GetStateStream(req *jobpb.GetJobStateRequest, stream jobpb.JobService_GetStateStreamServer) error {
s.mu.Lock()
job, ok := s.jobs[req.GetJobId()]
s.mu.Unlock()
if !ok {
return fmt.Errorf("job with id %v not found", req.GetJobId())
}

job.streamCond.L.Lock()
defer job.streamCond.L.Unlock()

state := job.state.Load().(jobpb.JobState_Enum)
for {
job.streamCond.L.Unlock()
stream.Send(&jobpb.JobStateEvent{
State: state,
Timestamp: timestamppb.Now(),
})
job.streamCond.L.Lock()
switch state {
case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, jobpb.JobState_DRAINED, jobpb.JobState_UPDATED, jobpb.JobState_FAILED:
// Reached terminal state.
return nil
}
newState := job.state.Load().(jobpb.JobState_Enum)
for state == newState {
select { // Quit out if the external connection is done.
case <-stream.Context().Done():
return context.Cause(stream.Context())
default:
}
job.streamCond.Wait()
newState = job.state.Load().(jobpb.JobState_Enum)
}
state = newState
}
}

0 comments on commit 04b49f1

Please sign in to comment.