Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go] Pipeline Resource Hints #23990

Merged
merged 14 commits into from
Nov 8, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -1959,12 +1959,18 @@ message ExecutableStagePayload {
}
}

// See https://beam.apache.org/documentation/runtime/resource-hints/ for additional documentation
// on the behavior of StandardResourceHint.
message StandardResourceHints {
enum Enum {
// Describes hardware accelerators that are desired to have in the execution environment.
// Payload: ASCII encoded string with the following format: "type:<type>;count:<n>;<options>" where type
// is an accelerator sku, count is the number of accelerators per worker, and options are
// related options flags.
ACCELERATOR = 0 [(beam_urn) = "beam:resources:accelerator:v1"];
// Describes desired minimal available RAM size in transform's execution environment.
// SDKs should convert the size to bytes, but can allow users to specify human-friendly units (e.g. GiB).
// Payload: ASCII encoded string of the base 10 representation of an integer number of bytes.
MIN_RAM_BYTES = 1 [(beam_urn) = "beam:resources:min_ram_bytes:v1"];
}
}
1 change: 1 addition & 0 deletions sdks/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
cloud.google.com/go/pubsub v1.26.0
cloud.google.com/go/storage v1.27.0
github.com/docker/go-connections v0.4.0
github.com/dustin/go-humanize v1.0.0
github.com/go-sql-driver/mysql v1.6.0
github.com/golang/protobuf v1.5.2 // TODO(danoliveira): Fully replace this with google.golang.org/protobuf
github.com/google/go-cmp v0.5.9
Expand Down
1 change: 1 addition & 0 deletions sdks/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ github.com/docker/libtrust v0.0.0-20150114040149-fa567046d9b1/go.mod h1:cyGadeNE
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
Expand Down
29 changes: 21 additions & 8 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/resource"
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
)

// Model constants for interfacing with a Beam runner.
// TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the pipeline_v1 proto
const (
URNImpulse = "beam:transform:impulse:v1"
URNParDo = "beam:transform:pardo:v1"
Expand Down Expand Up @@ -137,12 +137,16 @@ func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig
}, nil
}

// TODO(herohde) 11/6/2017: move some of the configuration into the graph during construction.
// TODO(https://github.com/apache/beam/issues/23893): Along with scoped resource hints,
// move some of the configuration into the graph during construction.

// Options for marshalling a graph into a model pipeline.
type Options struct {
// Environment used to run the user code.
Environment *pipepb.Environment

// PipelineResourceHints for setting defaults across the whole pipeline.
PipelineResourceHints resource.Hints
}

// Marshal converts a graph to a model pipeline.
Expand Down Expand Up @@ -871,11 +875,11 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) (string, error) {
//
// In particular, the "backup plan" needs to:
//
// * Encode the windowed element, preserving timestamps.
// * Add random keys to the encoded windowed element []bytes
// * GroupByKey (in the global window).
// * Explode the resulting elements list.
// * Decode the windowed element []bytes.
// - Encode the windowed element, preserving timestamps.
// - Add random keys to the encoded windowed element []bytes
// - GroupByKey (in the global window).
// - Explode the resulting elements list.
// - Decode the windowed element []bytes.
//
// While a simple reshard can be written in user terms, (timestamps and windows
// are accessible to user functions) there are some framework internal
Expand Down Expand Up @@ -1122,7 +1126,16 @@ const defaultEnvId = "go"

func (m *marshaller) addDefaultEnv() string {
if _, exists := m.environments[defaultEnvId]; !exists {
m.environments[defaultEnvId] = m.opt.Environment
env := proto.Clone(m.opt.Environment).(*pipepb.Environment)
// If there's no environment set, we need to ignore
if env == nil {
return defaultEnvId
}
// Add the pipeline level resource hints here for now.
// TODO(https://github.com/apache/beam/issues/23893) move to a better place for
// scoped hints in next pass, which affect number of environments set by Go pipelines.
env.ResourceHints = m.opt.PipelineResourceHints.Payloads()
damccorm marked this conversation as resolved.
Show resolved Hide resolved
m.environments[defaultEnvId] = env
}
return defaultEnvId
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (o *Options) Export() RawOptions {
}

// LoadOptionsFromFlags adds any flags not defined in excludeFlags to the options.
// If the key is already defnined, it ignores that flag
// If the key is already defined, it ignores that flag.
func (o *Options) LoadOptionsFromFlags(excludeFlags map[string]bool) {
o.mu.Lock()
defer o.mu.Unlock()
Expand Down
56 changes: 56 additions & 0 deletions sdks/go/pkg/beam/options/jobopts/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/resource"
)

func init() {
Expand All @@ -45,6 +46,12 @@ func init() {
"have no more than 1 override applied to it. If multiple "+
"overrides match a container image it is arbitrary which "+
"will be applied.")
flag.Var(&ResourceHints,
"resource_hints",
"Set whole pipeline level resource hints, accepting values of the format '<urn>=<value>'. "+
"eg.'min_ram=12GB', 'beam:resources:accelerator:v1='runner_specific' "+
"In case of duplicate hint URNs, the last value specified will be used. "+
"See https://beam.apache.org/documentation/runtime/resource-hints/ for more information.")
}

var (
Expand Down Expand Up @@ -92,6 +99,9 @@ var (

// Flag to set the degree of parallelism. If not set, the configured Flink default is used, or 1 if none can be found.
Parallelism = flag.Int("parallelism", -1, "The degree of parallelism to be used when distributing operations onto Flink workers.")

// ResourceHints flag takes whole pipeline hints for resources.
ResourceHints stringSlice
)

type missingFlagError error
Expand Down Expand Up @@ -169,3 +179,49 @@ func GetExperiments() []string {
}
return strings.Split(*Experiments, ",")
}

// GetPipelineResourceHints parses known standard hints and returns the flag set hints for the pipeline.
// In case of duplicate hint URNs, the last value specified will be used.
func GetPipelineResourceHints() resource.Hints {
hints := make([]resource.Hint, 0, len(ResourceHints))
for _, hint := range ResourceHints {
name, val, ok := strings.Cut(hint, "=")
if !ok {
panic(fmt.Sprintf("unparsable resource hint: %q", hint))
}
var h resource.Hint
switch name {
case "min_ram", "beam:resources:min_ram_bytes:v1":
h = resource.ParseMinRam(val)
case "accelerator", "beam:resources:accelerator:v1":
h = resource.Accelerator(val)
default:
if strings.HasPrefix(name, "beam:resources:") {
h = stringHint{urn: name, value: val}
} else {
panic(fmt.Sprintf("unknown resource hint: %v", hint))
}
}
hints = append(hints, h)
}
return resource.NewHints(hints...)
}

// stringHint is a backup implementation of hint for new standard hints.
type stringHint struct {
urn, value string
}

func (h stringHint) URN() string {
return h.urn
}

func (h stringHint) Payload() []byte {
// Go strings are utf8, and if the string is ascii,
// byte conversion handles that directly.
return []byte(h.value)
}

func (h stringHint) MergeWithOuter(outer resource.Hint) resource.Hint {
return h
}
50 changes: 45 additions & 5 deletions sdks/go/pkg/beam/options/jobopts/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/resource"
)

func TestGetEndpoint(t *testing.T) {
Expand Down Expand Up @@ -68,32 +69,41 @@ func TestGetJobName(t *testing.T) {
}
}

func TestGetEnvironamentUrn(t *testing.T) {
// Also tests IsLoopback because it uses the same flag.
func TestGetEnvironmentUrn(t *testing.T) {
tests := []struct {
env string
urn string
env string
urn string
isLoopback bool
}{
{
"PROCESS",
"beam:env:process:v1",
false,
},
{
"DOCKER",
"beam:env:docker:v1",
false,
},
{
"LOOPBACK",
"beam:env:external:v1",
true,
},
{
"",
"beam:env:docker:v1",
false,
},
}
for _, test := range tests {
EnvironmentType = &test.env
if gotUrn := GetEnvironmentUrn(context.Background()); gotUrn != test.urn {
t.Errorf("GetEnvironmentUrn(ctx) = %v, want %v", gotUrn, test.urn)
if got, want := GetEnvironmentUrn(context.Background()), test.urn; got != want {
t.Errorf("GetEnvironmentUrn(%v) = %v, want %v", test.env, got, want)
}
if got, want := IsLoopback(), test.isLoopback; got != want {
t.Errorf("IsLoopback(%v) = %v, want %v", test.env, got, want)
}
}
}
Expand Down Expand Up @@ -129,3 +139,33 @@ func TestGetSdkImageOverrides(t *testing.T) {
t.Errorf("GetSdkImageOverrides() = %v, want %v", got, want)
}
}

func TestGetPipelineResourceHints(t *testing.T) {
var hints stringSlice
hints.Set("min_ram=2GB")
hints.Set("beam:resources:min_ram_bytes:v1=16GB")
hints.Set("beam:resources:accelerator:v1=cheetah")
hints.Set("accelerator=pedal_to_the_metal")
hints.Set("beam:resources:novel_execution:v1=jaguar")
hints.Set("min_ram=1GB")
ResourceHints = hints

want := resource.NewHints(resource.ParseMinRam("1GB"), resource.Accelerator("pedal_to_the_metal"), stringHint{
urn: "beam:resources:novel_execution:v1",
value: "jaguar",
})
if got := GetPipelineResourceHints(); !got.Equal(want) {
t.Errorf("GetPipelineResourceHints() = %v, want %v", got, want)
}
}

func TestGetExperiements(t *testing.T) {
*Experiments = ""
if got, want := GetExperiments(), []string(nil); !reflect.DeepEqual(got, want) {
t.Errorf("GetExperiments(\"\") = %v, want %v", got, want)
}
*Experiments = "better,faster,stronger"
if got, want := GetExperiments(), []string{"better", "faster", "stronger"}; !reflect.DeepEqual(got, want) {
t.Errorf("GetExperiments(\"\") = %v, want %v", got, want)
}
}
8 changes: 5 additions & 3 deletions sdks/go/pkg/beam/options/jobopts/stringSlice.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
// of the flag.
//
// Example:
// var myFlags stringSlice
// flag.Var(&myFlags, "my_flag", "A list of flags")
//
// var myFlags stringSlice
// flag.Var(&myFlags, "my_flag", "A list of flags")
// $cmd -my_flag foo -my_flag bar
//
// With the example above, the slice can be set to contain ["foo", "bar"]:
// cmd -my_flag foo -my_flag bar
type stringSlice []string

// String implements the String method of flag.Value. This outputs the value
Expand Down
Loading