Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass provisioned options via struct. #26476

Merged
merged 1 commit into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions sdks/go/pkg/beam/core/runtime/harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,32 @@ import (
// URNMonitoringInfoShortID is a URN indicating support for short monitoring info IDs.
const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1"

// TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a plugin).
// Options for harness.Main that affect execution of the harness, such as runner capabilities.
type Options struct {
RunnerCapabilities []string // URNs for what runners are able to understand over the FnAPI.
StatusEndpoint string // Endpoint for worker status reporting.
}

// Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
// MainWithOptions is the main entrypoint for the Go harness. It runs at "runtime" -- not
// "pipeline-construction time" -- on each worker. It is a FnAPI client and
// ultimately responsible for correctly executing user code.
//
// Deprecated: Prefer MainWithOptions instead.
func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
return MainWithOptions(ctx, loggingEndpoint, controlEndpoint, Options{})
}

// MainWithOptions is the main entrypoint for the Go harness. It runs at "runtime" -- not
// "pipeline-construction time" -- on each worker. It is a FnAPI client and
// ultimately responsible for correctly executing user code.
//
// Options are optional configurations for interfacing with the runner or similar.
func MainWithOptions(ctx context.Context, loggingEndpoint, controlEndpoint string, opts Options) error {
hooks.DeserializeHooksFromOptions(ctx)

// Extract environment variables. These are optional runner supported capabilities.
// Expected env variables:
// RUNNER_CAPABILITIES : list of runner supported capability urn.
// STATUS_ENDPOINT : Endpoint to connect to status server used for worker status reporting.
statusEndpoint := os.Getenv("STATUS_ENDPOINT")
runnerCapabilities := strings.Split(os.Getenv("RUNNER_CAPABILITIES"), " ")
rcMap := make(map[string]bool)
if len(runnerCapabilities) > 0 {
for _, capability := range runnerCapabilities {
rcMap[capability] = true
}
for _, capability := range opts.RunnerCapabilities {
rcMap[capability] = true
}

// Pass in the logging endpoint for use w/the default remote logging hook.
Expand Down Expand Up @@ -151,8 +158,8 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
}

// if the runner supports worker status api then expose SDK harness status
if statusEndpoint != "" {
statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint, ctrl.cache, func(statusInfo *strings.Builder) { ctrl.metStoreToString(statusInfo) })
if opts.StatusEndpoint != "" {
statusHandler, err := newWorkerStatusHandler(ctx, opts.StatusEndpoint, ctrl.cache, func(statusInfo *strings.Builder) { ctrl.metStoreToString(statusInfo) })
if err != nil {
log.Errorf(ctx, "error establishing connection to worker status API: %v", err)
} else {
Expand Down
14 changes: 13 additions & 1 deletion sdks/go/pkg/beam/core/runtime/harness/init/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"encoding/json"
"flag"
"strings"
"time"

"fmt"
Expand Down Expand Up @@ -118,7 +119,18 @@ func hook() {
if err := syscallx.SetProcessMemoryCeiling(memLimit, memLimit); err != nil && err != syscallx.ErrUnsupported {
fmt.Println("Error Setting Rlimit ", err)
}
if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint); err != nil {

// Extract environment variables. These are optional runner supported capabilities.
// Expected env variables:
// RUNNER_CAPABILITIES : list of runner supported capability urn.
// STATUS_ENDPOINT : Endpoint to connect to status server used for worker status reporting.
statusEndpoint := os.Getenv("STATUS_ENDPOINT")
runnerCapabilities := strings.Split(os.Getenv("RUNNER_CAPABILITIES"), " ")
options := harness.Options{
StatusEndpoint: statusEndpoint,
RunnerCapabilities: runnerCapabilities,
}
if err := harness.MainWithOptions(ctx, *loggingEndpoint, *controlEndpoint, options); err != nil {
fmt.Fprintf(os.Stderr, "Worker failed: %v\n", err)
switch ShutdownMode {
case Terminate:
Expand Down
21 changes: 20 additions & 1 deletion sdks/go/pkg/beam/runners/universal/extworker/extworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net"
"sync"

"github.com/apache/beam/sdks/v2/go/container/tools"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
Expand Down Expand Up @@ -89,10 +90,28 @@ func (s *Loopback) StartWorker(ctx context.Context, req *fnpb.StartWorkerRequest
ctx = grpcx.WriteWorkerID(s.root, req.GetWorkerId())
ctx, s.workers[req.GetWorkerId()] = context.WithCancel(ctx)

go harness.Main(ctx, req.GetLoggingEndpoint().GetUrl(), req.GetControlEndpoint().GetUrl())
opts := harnessOptions(ctx, req.GetProvisionEndpoint().GetUrl())

go harness.MainWithOptions(ctx, req.GetLoggingEndpoint().GetUrl(), req.GetControlEndpoint().GetUrl(), opts)
return &fnpb.StartWorkerResponse{}, nil
}

func harnessOptions(ctx context.Context, endpoint string) harness.Options {
var opts harness.Options
if endpoint == "" {
return opts
}
info, err := tools.ProvisionInfo(ctx, endpoint)
if err != nil {
log.Infof(ctx, "error talking to provision service worker, using defaults:%v", err)
return opts
}

opts.StatusEndpoint = info.GetStatusEndpoint().GetUrl()
opts.RunnerCapabilities = info.GetRunnerCapabilities()
return opts
}

// StopWorker terminates a worker harness, implementing BeamFnExternalWorkerPoolServer.StopWorker.
func (s *Loopback) StopWorker(ctx context.Context, req *fnpb.StopWorkerRequest) (*fnpb.StopWorkerResponse, error) {
log.Infof(ctx, "stopping worker %v", req.GetWorkerId())
Expand Down