Skip to content

Commit

Permalink
[apache#28187][prism] worker shutdown, cleanup, log fail, port spec, …
Browse files Browse the repository at this point in the history
…grpc recv size (apache#28184)

* [prism] worker shutdown, cleanup, log fail

* Increase prism server receive size to max.
  • Loading branch information
lostluck committed Aug 30, 2023
1 parent 70462fb commit 77e658d
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 5 deletions.
10 changes: 10 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"sort"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
Expand Down Expand Up @@ -55,6 +56,15 @@ func RunPipeline(j *jobservices.Job) {
env, _ := getOnlyPair(envs)
wk := worker.New(env) // Cheating by having the worker id match the environment id.
go wk.Serve()
timeout := time.Minute
time.AfterFunc(timeout, func() {
if wk.Connected() {
return
}
err := fmt.Errorf("prism %v didn't get control connection after %v", wk, timeout)
j.Failed(err)
j.CancelFn(err)
})

// When this function exits, we cancel the context to clear
// any related job resources.
Expand Down
1 change: 1 addition & 0 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (j *Job) Done() {

// Failed indicates that the job completed unsuccessfully.
func (j *Job) Failed(err error) {
slog.Error("job failed", slog.Any("job", j), slog.Any("error", err))
j.failureErr = err
j.sendState(jobpb.JobState_FAILED)
j.CancelFn(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func (s *Server) getJob(id string) *Job {
}

func (s *Server) Endpoint() string {
return s.lis.Addr().String()
_, port, _ := net.SplitHostPort(s.lis.Addr().String())
return fmt.Sprintf("localhost:%v", port)
}

// Serve serves on the started listener. Blocks.
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (p *preprocessor) preProcessGraph(comps *pipepb.Components) []*stage {
keptLeaves := maps.Keys(leaves)
sort.Strings(keptLeaves)
topological := pipelinex.TopologicalSort(ts, keptLeaves)
slog.Debug("topological transform ordering", topological)
slog.Debug("topological transform ordering", slog.Any("topological", topological))

// Basic Fusion Behavior
//
Expand Down
18 changes: 15 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"io"
"math"
"net"
"strconv"
"strings"
Expand Down Expand Up @@ -62,6 +63,7 @@ type W struct {

// These are the ID sources
inst, bund uint64
connected atomic.Bool

InstReqs chan *fnpb.InstructionRequest
DataReqs chan *fnpb.Elements
Expand All @@ -83,7 +85,9 @@ func New(id string) *W {
if err != nil {
panic(fmt.Sprintf("failed to listen: %v", err))
}
var opts []grpc.ServerOption
opts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(math.MaxInt32),
}
wk := &W{
ID: id,
lis: lis,
Expand All @@ -106,7 +110,8 @@ func New(id string) *W {
}

func (wk *W) Endpoint() string {
return wk.lis.Addr().String()
_, port, _ := net.SplitHostPort(wk.lis.Addr().String())
return fmt.Sprintf("localhost:%v", port)
}

// Serve serves on the started listener. Blocks.
Expand Down Expand Up @@ -200,7 +205,7 @@ func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error {
file = file[:i]
}

slog.LogAttrs(context.TODO(), toSlogSev(l.GetSeverity()), l.GetMessage(),
slog.LogAttrs(stream.Context(), toSlogSev(l.GetSeverity()), l.GetMessage(),
slog.Any(slog.SourceKey, &slog.Source{
File: file,
Line: line,
Expand Down Expand Up @@ -241,10 +246,15 @@ func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProces
return desc, nil
}

func (wk *W) Connected() bool {
return wk.connected.Load()
}

// Control relays instructions to SDKs and back again, coordinated via unique instructionIDs.
//
// Requests come from the runner, and are sent to the client in the SDK.
func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
wk.connected.Store(true)
done := make(chan struct{})
go func() {
for {
Expand Down Expand Up @@ -281,10 +291,12 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
case req := <-wk.InstReqs:
err := ctrl.Send(req)
if err != nil {
go func() { <-done }()
return err
}
case <-ctrl.Context().Done():
slog.Debug("Control context canceled")
go func() { <-done }()
return ctrl.Context().Err()
case <-done:
slog.Debug("Control done")
Expand Down

0 comments on commit 77e658d

Please sign in to comment.