Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GoSDK + Prism] Support Process env execution. #33651

Merged
merged 3 commits into from
Jan 18, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -74,12 +74,14 @@
* External, Process based Worker Pool support added to the Go SDK container. ([#33572](https://github.com/apache/beam/pull/33572))
* This is used to enable sidecar containers to run SDK workers for some runners.
* See https://beam.apache.org/documentation/runtime/sdk-harness-config/ for details.
* Support the Process Environment for execution in the Go SDK. ([#33651](https://github.com/apache/beam/pull/33651))
* Prism
* Prism now uses the same single port for both pipeline submission and execution on workers. Requests are differentiated by worker-id. ([#33438](https://github.com/apache/beam/pull/33438))
* This avoids port starvation and provides clarity on port use when running Prism in non-local environments.
* Support for @RequiresTimeSortedInputs added. ([#33513](https://github.com/apache/beam/issues/33513))
* Initial support for AllowedLateness added. ([#33542](https://github.com/apache/beam/pull/33542))
* The Go SDK's inprocess Prism runner (AKA the Go SDK default runner) now supports non-loopback mode environment types. ([#33572](https://github.com/apache/beam/pull/33572))
* Support the Process Environment for execution in Prism ([#33651](https://github.com/apache/beam/pull/33651))

## Breaking Changes

9 changes: 7 additions & 2 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ package graphx

import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
@@ -122,8 +123,12 @@ func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig
var serializedPayload []byte
switch urn {
case URNEnvProcess:
// TODO Support process based SDK Harness.
return nil, errors.Errorf("unsupported environment %v", urn)
config := extractEnvironmentConfig(ctx)
payload := &pipepb.ProcessPayload{}
if err := json.Unmarshal([]byte(config), payload); err != nil {
return nil, fmt.Errorf("unable to json unmarshal --environment_config: %w", err)
}
serializedPayload = protox.MustEncode(payload)
case URNEnvExternal:
config := extractEnvironmentConfig(ctx)
payload := &pipepb.ExternalPayload{Endpoint: &pipepb.ApiServiceDescriptor{Url: config}}
31 changes: 22 additions & 9 deletions sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
Original file line number Diff line number Diff line change
@@ -296,22 +296,22 @@ func (fn *splitPickFn) ProcessElement(_ *testRT, a int, small, big func(int)) {
}

func TestCreateEnvironment(t *testing.T) {
t.Run("process", func(t *testing.T) {
const wantEnv = "process"
t.Run("processBadConfig", func(t *testing.T) {
urn := graphx.URNEnvProcess
got, err := graphx.CreateEnvironment(context.Background(), urn, func(_ context.Context) string { return wantEnv })
got, err := graphx.CreateEnvironment(context.Background(), urn, func(_ context.Context) string { return "not a real json" })
if err == nil {
t.Errorf("CreateEnvironment(%v) = %v error, want error since it's unsupported", urn, err)
t.Errorf("CreateEnvironment(%v) = %v error, want error since parsing should fail", urn, err)
}
want := (*pipepb.Environment)(nil)
if !proto.Equal(got, want) {
t.Errorf("CreateEnvironment(%v) = %v, want %v since it's unsupported", urn, got, want)
t.Errorf("CreateEnvironment(%v) = %v, want %v since creation should have failed", urn, got, want)
}
})
tests := []struct {
name string
urn string
payload func(name string) []byte
name string
configOverride string
urn string
payload func(name string) []byte
}{
{
name: "external",
@@ -331,12 +331,25 @@ func TestCreateEnvironment(t *testing.T) {
ContainerImage: name,
})
},
}, {
name: "process",
configOverride: "{ \"command\": \"process\" }",
urn: graphx.URNEnvProcess,
payload: func(name string) []byte {
return protox.MustEncode(&pipepb.ProcessPayload{
Command: name,
})
},
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
got, err := graphx.CreateEnvironment(context.Background(), test.urn, func(_ context.Context) string { return test.name })
config := test.name
if test.configOverride != "" {
config = test.configOverride
}
got, err := graphx.CreateEnvironment(context.Background(), test.urn, func(_ context.Context) string { return config })
if err != nil {
t.Errorf("CreateEnvironment(%v) = %v error, want nil", test.urn, err)
}
31 changes: 31 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/environments.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,8 @@ import (
"io"
"log/slog"
"os"
"os/exec"
"time"

fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
@@ -66,6 +68,16 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor
logger.Error("unmarshing docker environment payload", "error", err)
}
return dockerEnvironment(ctx, logger, dp, wk, j.ArtifactEndpoint())
case urns.EnvProcess:
pp := &pipepb.ProcessPayload{}
if err := (proto.UnmarshalOptions{}).Unmarshal(e.GetPayload(), pp); err != nil {
logger.Error("unmarshing docker environment payload", "error", err)
}
go func() {
processEnvironment(ctx, pp, wk)
logger.Debug("environment stopped", slog.String("job", j.String()))
}()
return nil
default:
return fmt.Errorf("environment %v with urn %v unimplemented", env, e.GetUrn())
}
@@ -231,3 +243,22 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock

return nil
}

func processEnvironment(ctx context.Context, pp *pipepb.ProcessPayload, wk *worker.W) {
cmd := exec.CommandContext(ctx, pp.GetCommand(), "--id="+wk.ID, "--provision_endpoint="+wk.Endpoint())

cmd.WaitDelay = time.Millisecond * 100
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout
cmd.Env = os.Environ()

for k, v := range pp.GetEnv() {
cmd.Env = append(cmd.Environ(), fmt.Sprintf("%v=%v", k, v))
}
if err := cmd.Start(); err != nil {
return
}
// Job processing happens here, but orchestrated by other goroutines
// This call blocks until the context is cancelled, or the command exits.
cmd.Wait()
}