Skip to content

Commit

Permalink
Move provisioned options outside of harness.Main (apache#26476)
Browse files Browse the repository at this point in the history
Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck committed May 1, 2023
1 parent 63c9d7e commit 5e3e56b
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 16 deletions.
35 changes: 21 additions & 14 deletions sdks/go/pkg/beam/core/runtime/harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,32 @@ import (
// URNMonitoringInfoShortID is a URN indicating support for short monitoring info IDs.
const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1"

// TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a plugin).
// Options for harness.Main that affect execution of the harness, such as runner capabilities.
type Options struct {
RunnerCapabilities []string // URNs for what runners are able to understand over the FnAPI.
StatusEndpoint string // Endpoint for worker status reporting.
}

// Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
// MainWithOptions is the main entrypoint for the Go harness. It runs at "runtime" -- not
// "pipeline-construction time" -- on each worker. It is a FnAPI client and
// ultimately responsible for correctly executing user code.
//
// Deprecated: Prefer MainWithOptions instead.
func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
return MainWithOptions(ctx, loggingEndpoint, controlEndpoint, Options{})
}

// MainWithOptions is the main entrypoint for the Go harness. It runs at "runtime" -- not
// "pipeline-construction time" -- on each worker. It is a FnAPI client and
// ultimately responsible for correctly executing user code.
//
// Options are optional configurations for interfacing with the runner or similar.
func MainWithOptions(ctx context.Context, loggingEndpoint, controlEndpoint string, opts Options) error {
hooks.DeserializeHooksFromOptions(ctx)

// Extract environment variables. These are optional runner supported capabilities.
// Expected env variables:
// RUNNER_CAPABILITIES : list of runner supported capability urn.
// STATUS_ENDPOINT : Endpoint to connect to status server used for worker status reporting.
statusEndpoint := os.Getenv("STATUS_ENDPOINT")
runnerCapabilities := strings.Split(os.Getenv("RUNNER_CAPABILITIES"), " ")
rcMap := make(map[string]bool)
if len(runnerCapabilities) > 0 {
for _, capability := range runnerCapabilities {
rcMap[capability] = true
}
for _, capability := range opts.RunnerCapabilities {
rcMap[capability] = true
}

// Pass in the logging endpoint for use w/the default remote logging hook.
Expand Down Expand Up @@ -151,8 +158,8 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
}

// if the runner supports worker status api then expose SDK harness status
if statusEndpoint != "" {
statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint, ctrl.cache, func(statusInfo *strings.Builder) { ctrl.metStoreToString(statusInfo) })
if opts.StatusEndpoint != "" {
statusHandler, err := newWorkerStatusHandler(ctx, opts.StatusEndpoint, ctrl.cache, func(statusInfo *strings.Builder) { ctrl.metStoreToString(statusInfo) })
if err != nil {
log.Errorf(ctx, "error establishing connection to worker status API: %v", err)
} else {
Expand Down
14 changes: 13 additions & 1 deletion sdks/go/pkg/beam/core/runtime/harness/init/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"encoding/json"
"flag"
"strings"
"time"

"fmt"
Expand Down Expand Up @@ -118,7 +119,18 @@ func hook() {
if err := syscallx.SetProcessMemoryCeiling(memLimit, memLimit); err != nil && err != syscallx.ErrUnsupported {
fmt.Println("Error Setting Rlimit ", err)
}
if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint); err != nil {

// Extract environment variables. These are optional runner supported capabilities.
// Expected env variables:
// RUNNER_CAPABILITIES : list of runner supported capability urn.
// STATUS_ENDPOINT : Endpoint to connect to status server used for worker status reporting.
statusEndpoint := os.Getenv("STATUS_ENDPOINT")
runnerCapabilities := strings.Split(os.Getenv("RUNNER_CAPABILITIES"), " ")
options := harness.Options{
StatusEndpoint: statusEndpoint,
RunnerCapabilities: runnerCapabilities,
}
if err := harness.MainWithOptions(ctx, *loggingEndpoint, *controlEndpoint, options); err != nil {
fmt.Fprintf(os.Stderr, "Worker failed: %v\n", err)
switch ShutdownMode {
case Terminate:
Expand Down
21 changes: 20 additions & 1 deletion sdks/go/pkg/beam/runners/universal/extworker/extworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net"
"sync"

"github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
Expand Down Expand Up @@ -89,10 +90,28 @@ func (s *Loopback) StartWorker(ctx context.Context, req *fnpb.StartWorkerRequest
ctx = grpcx.WriteWorkerID(s.root, req.GetWorkerId())
ctx, s.workers[req.GetWorkerId()] = context.WithCancel(ctx)

go harness.Main(ctx, req.GetLoggingEndpoint().GetUrl(), req.GetControlEndpoint().GetUrl())
opts := harnessOptions(ctx, req.GetProvisionEndpoint().GetUrl())

go harness.MainWithOptions(ctx, req.GetLoggingEndpoint().GetUrl(), req.GetControlEndpoint().GetUrl(), opts)
return &fnpb.StartWorkerResponse{}, nil
}

func harnessOptions(ctx context.Context, endpoint string) harness.Options {
var opts harness.Options
if endpoint == "" {
return opts
}
info, err := tools.ProvisionInfo(ctx, endpoint)
if err != nil {
log.Infof(ctx, "error talking to provision service worker, using defaults:%v", err)
return opts
}

opts.StatusEndpoint = info.GetStatusEndpoint().GetUrl()
opts.RunnerCapabilities = info.GetRunnerCapabilities()
return opts
}

// StopWorker terminates a worker harness, implementing BeamFnExternalWorkerPoolServer.StopWorker.
func (s *Loopback) StopWorker(ctx context.Context, req *fnpb.StopWorkerRequest) (*fnpb.StopWorkerResponse, error) {
log.Infof(ctx, "stopping worker %v", req.GetWorkerId())
Expand Down

0 comments on commit 5e3e56b

Please sign in to comment.