diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index c7e6a9b4db24..69c178b7e4cf 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -45,25 +45,32 @@ import ( // URNMonitoringInfoShortID is a URN indicating support for short monitoring info IDs. const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1" -// TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a plugin). +// Options for harness.Main that affect execution of the harness, such as runner capabilities. +type Options struct { + RunnerCapabilities []string // URNs for what runners are able to understand over the FnAPI. + StatusEndpoint string // Endpoint for worker status reporting. +} -// Main is the main entrypoint for the Go harness. It runs at "runtime" -- not +// MainWithOptions is the main entrypoint for the Go harness. It runs at "runtime" -- not // "pipeline-construction time" -- on each worker. It is a FnAPI client and // ultimately responsible for correctly executing user code. +// +// Deprecated: Prefer MainWithOptions instead. func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { + return MainWithOptions(ctx, loggingEndpoint, controlEndpoint, Options{}) +} + +// MainWithOptions is the main entrypoint for the Go harness. It runs at "runtime" -- not +// "pipeline-construction time" -- on each worker. It is a FnAPI client and +// ultimately responsible for correctly executing user code. +// +// Options are optional configurations for interfacing with the runner or similar. +func MainWithOptions(ctx context.Context, loggingEndpoint, controlEndpoint string, opts Options) error { hooks.DeserializeHooksFromOptions(ctx) - // Extract environment variables. These are optional runner supported capabilities. - // Expected env variables: - // RUNNER_CAPABILITIES : list of runner supported capability urn. - // STATUS_ENDPOINT : Endpoint to connect to status server used for worker status reporting. - statusEndpoint := os.Getenv("STATUS_ENDPOINT") - runnerCapabilities := strings.Split(os.Getenv("RUNNER_CAPABILITIES"), " ") rcMap := make(map[string]bool) - if len(runnerCapabilities) > 0 { - for _, capability := range runnerCapabilities { - rcMap[capability] = true - } + for _, capability := range opts.RunnerCapabilities { + rcMap[capability] = true } // Pass in the logging endpoint for use w/the default remote logging hook. @@ -151,8 +158,8 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { } // if the runner supports worker status api then expose SDK harness status - if statusEndpoint != "" { - statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint, ctrl.cache, func(statusInfo *strings.Builder) { ctrl.metStoreToString(statusInfo) }) + if opts.StatusEndpoint != "" { + statusHandler, err := newWorkerStatusHandler(ctx, opts.StatusEndpoint, ctrl.cache, func(statusInfo *strings.Builder) { ctrl.metStoreToString(statusInfo) }) if err != nil { log.Errorf(ctx, "error establishing connection to worker status API: %v", err) } else { diff --git a/sdks/go/pkg/beam/core/runtime/harness/init/init.go b/sdks/go/pkg/beam/core/runtime/harness/init/init.go index 7cbf2158fd23..55f3d0beabdc 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/init/init.go +++ b/sdks/go/pkg/beam/core/runtime/harness/init/init.go @@ -22,6 +22,7 @@ import ( "context" "encoding/json" "flag" + "strings" "time" "fmt" @@ -118,7 +119,18 @@ func hook() { if err := syscallx.SetProcessMemoryCeiling(memLimit, memLimit); err != nil && err != syscallx.ErrUnsupported { fmt.Println("Error Setting Rlimit ", err) } - if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint); err != nil { + + // Extract environment variables. These are optional runner supported capabilities. + // Expected env variables: + // RUNNER_CAPABILITIES : list of runner supported capability urn. + // STATUS_ENDPOINT : Endpoint to connect to status server used for worker status reporting. + statusEndpoint := os.Getenv("STATUS_ENDPOINT") + runnerCapabilities := strings.Split(os.Getenv("RUNNER_CAPABILITIES"), " ") + options := harness.Options{ + StatusEndpoint: statusEndpoint, + RunnerCapabilities: runnerCapabilities, + } + if err := harness.MainWithOptions(ctx, *loggingEndpoint, *controlEndpoint, options); err != nil { fmt.Fprintf(os.Stderr, "Worker failed: %v\n", err) switch ShutdownMode { case Terminate: diff --git a/sdks/go/pkg/beam/runners/universal/extworker/extworker.go b/sdks/go/pkg/beam/runners/universal/extworker/extworker.go index ffc8f8e47c09..e03ebf2ffc1e 100644 --- a/sdks/go/pkg/beam/runners/universal/extworker/extworker.go +++ b/sdks/go/pkg/beam/runners/universal/extworker/extworker.go @@ -22,6 +22,7 @@ import ( "net" "sync" + "github.com/apache/beam/sdks/v2/go/container/tools" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" @@ -89,10 +90,28 @@ func (s *Loopback) StartWorker(ctx context.Context, req *fnpb.StartWorkerRequest ctx = grpcx.WriteWorkerID(s.root, req.GetWorkerId()) ctx, s.workers[req.GetWorkerId()] = context.WithCancel(ctx) - go harness.Main(ctx, req.GetLoggingEndpoint().GetUrl(), req.GetControlEndpoint().GetUrl()) + opts := harnessOptions(ctx, req.GetProvisionEndpoint().GetUrl()) + + go harness.MainWithOptions(ctx, req.GetLoggingEndpoint().GetUrl(), req.GetControlEndpoint().GetUrl(), opts) return &fnpb.StartWorkerResponse{}, nil } +func harnessOptions(ctx context.Context, endpoint string) harness.Options { + var opts harness.Options + if endpoint == "" { + return opts + } + info, err := tools.ProvisionInfo(ctx, endpoint) + if err != nil { + log.Infof(ctx, "error talking to provision service worker, using defaults:%v", err) + return opts + } + + opts.StatusEndpoint = info.GetStatusEndpoint().GetUrl() + opts.RunnerCapabilities = info.GetRunnerCapabilities() + return opts +} + // StopWorker terminates a worker harness, implementing BeamFnExternalWorkerPoolServer.StopWorker. func (s *Loopback) StopWorker(ctx context.Context, req *fnpb.StopWorkerRequest) (*fnpb.StopWorkerResponse, error) { log.Infof(ctx, "stopping worker %v", req.GetWorkerId())