diff --git a/cmd/training-operator.v1/main.go b/cmd/training-operator.v1/main.go index 9b338bf5ac..4b09586cdb 100644 --- a/cmd/training-operator.v1/main.go +++ b/cmd/training-operator.v1/main.go @@ -20,6 +20,7 @@ import ( "flag" "fmt" "os" + "strings" "go.uber.org/zap/zapcore" "k8s.io/apimachinery/pkg/runtime" @@ -29,8 +30,11 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" + schedulerpluginsv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" + "github.com/kubeflow/common/pkg/controller.v1/common" commonutil "github.com/kubeflow/common/pkg/util" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/kubeflow/training-operator/pkg/config" @@ -47,6 +51,7 @@ func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(kubeflowv1.AddToScheme(scheme)) utilruntime.Must(v1beta1.AddToScheme(scheme)) + utilruntime.Must(schedulerpluginsv1alpha1.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } @@ -56,7 +61,6 @@ func main() { var leaderElectionID string var probeAddr string var enabledSchemes controllerv1.EnabledSchemes - var enableGangScheduling bool var gangSchedulerName string var namespace string var monitoringPort int @@ -69,8 +73,7 @@ func main() { flag.StringVar(&leaderElectionID, "leader-election-id", "1ca428e5.training-operator.kubeflow.org", "The ID for leader election.") flag.Var(&enabledSchemes, "enable-scheme", "Enable scheme(s) as --enable-scheme=tfjob --enable-scheme=pytorchjob, case insensitive."+ " Now supporting TFJob, PyTorchJob, MXNetJob, XGBoostJob, PaddleJob. By default, all supported schemes will be enabled.") - flag.BoolVar(&enableGangScheduling, "enable-gang-scheduling", false, "Set true to enable gang scheduling") - flag.StringVar(&gangSchedulerName, "gang-scheduler-name", "volcano", "The scheduler to gang-schedule kubeflow jobs, defaults to volcano") + flag.StringVar(&gangSchedulerName, "gang-scheduler-name", "none", "The scheduler to gang-schedule kubeflow jobs, defaults to none") flag.StringVar(&namespace, "namespace", os.Getenv(commonutil.EnvKubeflowNamespace), "The namespace to monitor kubeflow jobs. If unset, it monitors all namespaces cluster-wide."+ "If set, it only monitors kubeflow jobs in the given namespace.") flag.IntVar(&monitoringPort, "monitoring-port", 9443, "Endpoint port for displaying monitoring metrics. "+ @@ -110,6 +113,16 @@ func main() { os.Exit(1) } + // Prepare GangSchedulingSetupFunc + gangSchedulingSetupFunc := common.GenNonGangSchedulerSetupFunc() + if strings.EqualFold(gangSchedulerName, string(common.GangSchedulerVolcano)) { + cfg := mgr.GetConfig() + volcanoClientSet := volcanoclient.NewForConfigOrDie(cfg) + gangSchedulingSetupFunc = common.GenVolcanoSetupFunc(volcanoClientSet) + } else if strings.EqualFold(gangSchedulerName, string(common.GangSchedulerSchedulerPlugins)) { + gangSchedulingSetupFunc = common.GenSchedulerPluginsSetupFunc(mgr.GetClient()) + } + // TODO: We need a general manager. all rest reconciler addsToManager // Based on the user configuration, we start different controllers if enabledSchemes.Empty() { @@ -122,7 +135,7 @@ func main() { "scheme not supported", "scheme", s) os.Exit(1) } - if err = setupFunc(mgr, enableGangScheduling, controllerThreads); err != nil { + if err = setupFunc(mgr, gangSchedulingSetupFunc, controllerThreads); err != nil { setupLog.Error(err, "unable to create controller", "controller", s) os.Exit(1) } diff --git a/go.mod b/go.mod index d7373d8a6a..a2c8d0d52b 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/go-logr/logr v1.2.3 - github.com/kubeflow/common v0.4.5 + github.com/kubeflow/common v0.4.6 github.com/onsi/ginkgo/v2 v2.1.6 github.com/onsi/gomega v1.20.1 github.com/prometheus/client_golang v1.12.2 @@ -18,6 +18,7 @@ require ( k8s.io/kube-openapi v0.0.0-20220803162953-67bda5d908f1 k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed sigs.k8s.io/controller-runtime v0.13.0 + sigs.k8s.io/scheduler-plugins v0.24.9 sigs.k8s.io/yaml v1.3.0 volcano.sh/apis v1.2.0-k8s1.19.6 ) @@ -68,11 +69,11 @@ require ( go.uber.org/multierr v1.8.0 // indirect golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect - golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect + golang.org/x/net v0.3.1-0.20221206200815-1e63c2f08a10 // indirect golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect - golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect - golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect - golang.org/x/text v0.3.7 // indirect + golang.org/x/sys v0.3.0 // indirect + golang.org/x/term v0.3.0 // indirect + golang.org/x/text v0.5.0 // indirect golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect golang.org/x/tools v0.1.12 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect diff --git a/go.sum b/go.sum index d060cbe281..b9fbfacc7b 100644 --- a/go.sum +++ b/go.sum @@ -327,8 +327,8 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/kubeflow/common v0.4.5 h1:W7p+s/4Za1UzIgKP2Z6ormEvsUVHykeaXaOuu8+UgpI= -github.com/kubeflow/common v0.4.5/go.mod h1:di43u2m7DyuwnRDb7Kwz1nmA/nhpjnQ+K+gWCV/SPZk= +github.com/kubeflow/common v0.4.6 h1:yzJf/HEdS6ginD0GlVkgbOFie0Sp66VdGjXidAGZIlk= +github.com/kubeflow/common v0.4.6/go.mod h1:43MAof/uhpJA2C0urynqatE3oKFQc7m2HLmJty7waqY= github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -555,8 +555,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.3.1-0.20221206200815-1e63c2f08a10 h1:Frnccbp+ok2GkUS2tC84yAq/U9Vg+0sIO7aRL3T4Xnc= +golang.org/x/net v0.3.1-0.20221206200815-1e63c2f08a10/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -648,11 +648,11 @@ golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= +golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.3.0 h1:qoo4akIqOcDME5bhc/NgxUdovd6BSS2uMsVjB56q1xI= +golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -661,8 +661,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= -golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= +golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -954,6 +954,8 @@ sigs.k8s.io/controller-runtime v0.13.0 h1:iqa5RNciy7ADWnIc8QxCbOX5FEKVR3uxVxKHRM sigs.k8s.io/controller-runtime v0.13.0/go.mod h1:Zbz+el8Yg31jubvAEyglRZGdLAjplZl+PgtYNI6WNTI= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/scheduler-plugins v0.24.9 h1:9oGtwk6uh7mZMCX8+O+PipQzBiRq9d2+E3xq1cn7zbc= +sigs.k8s.io/scheduler-plugins v0.24.9/go.mod h1:0u2b/0SwY2ozDhOD/f1S3e5IbStoDFLUK8yP5dJTaQ8= sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK1F7G282QMXDPYydCw= sigs.k8s.io/structured-merge-diff/v4 v4.2.3 h1:PRbqxJClWWYMNV1dhaG4NsibJbArud9kFxnAMREiWFE= sigs.k8s.io/structured-merge-diff/v4 v4.2.3/go.mod h1:qjx8mGObPmV2aSZepjQjbmb2ihdVs8cGKBraizNC69E= diff --git a/hack/python-sdk/swagger.json b/hack/python-sdk/swagger.json index c931a76eb5..7247abd6ba 100644 --- a/hack/python-sdk/swagger.json +++ b/hack/python-sdk/swagger.json @@ -714,7 +714,11 @@ "format": "int32" }, "labelSelector": { - "description": "A label selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty label selector matches all objects. A null label selector matches no objects.", + "description": "Deprecated: Use Selector instead", + "$ref": "#/definitions/v1.LabelSelector" + }, + "selector": { + "description": "A Selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty Selector matches all objects. A null Selector matches no objects.", "type": "string" }, "succeeded": { @@ -773,6 +777,10 @@ }, "queue": { "type": "string" + }, + "scheduleTimeoutSeconds": { + "type": "integer", + "format": "int32" } } } diff --git a/manifests/base/cluster-role.yaml b/manifests/base/cluster-role.yaml index ae3732dafa..eb6004ebb0 100644 --- a/manifests/base/cluster-role.yaml +++ b/manifests/base/cluster-role.yaml @@ -91,3 +91,9 @@ rules: - horizontalpodautoscalers verbs: - "*" + - apiGroups: + - scheduling.sigs.k8s.io + resources: + - podgroups + verbs: + - "*" diff --git a/manifests/base/crds/kubeflow.org_mpijobs.yaml b/manifests/base/crds/kubeflow.org_mpijobs.yaml index 4f13e0db48..b1a84dc7f7 100644 --- a/manifests/base/crds/kubeflow.org_mpijobs.yaml +++ b/manifests/base/crds/kubeflow.org_mpijobs.yaml @@ -7397,6 +7397,9 @@ spec: type: string queue: type: string + scheduleTimeoutSeconds: + format: int32 + type: integer type: object ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. @@ -7476,10 +7479,55 @@ spec: format: int32 type: integer labelSelector: - description: A label selector is a label query over a set of - resources. The result of matchLabels and matchExpressions - are ANDed. An empty label selector matches all objects. A - null label selector matches no objects. + description: 'Deprecated: Use Selector instead' + properties: + matchExpressions: + description: matchExpressions is a list of label selector + requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector + that contains values, a key, and an operator that relates + the key and values. + properties: + key: + description: key is the label key that the selector + applies to. + type: string + operator: + description: operator represents a key's relationship + to a set of values. Valid operators are In, NotIn, + Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. + If the operator is In or NotIn, the values array + must be non-empty. If the operator is Exists or + DoesNotExist, the values array must be empty. This + array is replaced during a strategic merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. + A single {key,value} in the matchLabels map is equivalent + to an element of matchExpressions, whose key field is + "key", the operator is "In", and the values array contains + only "value". The requirements are ANDed. + type: object + type: object + x-kubernetes-map-type: atomic + selector: + description: A Selector is a label query over a set of resources. + The result of matchLabels and matchExpressions are ANDed. + An empty Selector matches all objects. A null Selector matches + no objects. type: string succeeded: description: The number of pods which reached phase Succeeded. diff --git a/manifests/base/crds/kubeflow.org_mxjobs.yaml b/manifests/base/crds/kubeflow.org_mxjobs.yaml index ec7bab0a1d..0b70d7316b 100644 --- a/manifests/base/crds/kubeflow.org_mxjobs.yaml +++ b/manifests/base/crds/kubeflow.org_mxjobs.yaml @@ -7397,6 +7397,9 @@ spec: type: string queue: type: string + scheduleTimeoutSeconds: + format: int32 + type: integer type: object ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. @@ -7472,10 +7475,55 @@ spec: format: int32 type: integer labelSelector: - description: A label selector is a label query over a set of - resources. The result of matchLabels and matchExpressions - are ANDed. An empty label selector matches all objects. A - null label selector matches no objects. + description: 'Deprecated: Use Selector instead' + properties: + matchExpressions: + description: matchExpressions is a list of label selector + requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector + that contains values, a key, and an operator that relates + the key and values. + properties: + key: + description: key is the label key that the selector + applies to. + type: string + operator: + description: operator represents a key's relationship + to a set of values. Valid operators are In, NotIn, + Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. + If the operator is In or NotIn, the values array + must be non-empty. If the operator is Exists or + DoesNotExist, the values array must be empty. This + array is replaced during a strategic merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. + A single {key,value} in the matchLabels map is equivalent + to an element of matchExpressions, whose key field is + "key", the operator is "In", and the values array contains + only "value". The requirements are ANDed. + type: object + type: object + x-kubernetes-map-type: atomic + selector: + description: A Selector is a label query over a set of resources. + The result of matchLabels and matchExpressions are ANDed. + An empty Selector matches all objects. A null Selector matches + no objects. type: string succeeded: description: The number of pods which reached phase Succeeded. diff --git a/manifests/base/crds/kubeflow.org_paddlejobs.yaml b/manifests/base/crds/kubeflow.org_paddlejobs.yaml index dedf3ccb8a..4446769ddb 100644 --- a/manifests/base/crds/kubeflow.org_paddlejobs.yaml +++ b/manifests/base/crds/kubeflow.org_paddlejobs.yaml @@ -7904,6 +7904,9 @@ spec: type: string queue: type: string + scheduleTimeoutSeconds: + format: int32 + type: integer type: object ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. @@ -7978,10 +7981,55 @@ spec: format: int32 type: integer labelSelector: - description: A label selector is a label query over a set of - resources. The result of matchLabels and matchExpressions - are ANDed. An empty label selector matches all objects. A - null label selector matches no objects. + description: 'Deprecated: Use Selector instead' + properties: + matchExpressions: + description: matchExpressions is a list of label selector + requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector + that contains values, a key, and an operator that relates + the key and values. + properties: + key: + description: key is the label key that the selector + applies to. + type: string + operator: + description: operator represents a key's relationship + to a set of values. Valid operators are In, NotIn, + Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. + If the operator is In or NotIn, the values array + must be non-empty. If the operator is Exists or + DoesNotExist, the values array must be empty. This + array is replaced during a strategic merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. + A single {key,value} in the matchLabels map is equivalent + to an element of matchExpressions, whose key field is + "key", the operator is "In", and the values array contains + only "value". The requirements are ANDed. + type: object + type: object + x-kubernetes-map-type: atomic + selector: + description: A Selector is a label query over a set of resources. + The result of matchLabels and matchExpressions are ANDed. + An empty Selector matches all objects. A null Selector matches + no objects. type: string succeeded: description: The number of pods which reached phase Succeeded. @@ -8007,7 +8055,7 @@ spec: storage: true subresources: scale: - labelSelectorPath: .status.replicaStatuses.Worker.labelSelector + labelSelectorPath: .status.replicaStatuses.Worker.selector specReplicasPath: .spec.paddleReplicaSpecs.Worker.replicas statusReplicasPath: .status.replicaStatuses.Worker.active status: {} diff --git a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml index db4c3c0bd1..bb2de4e1f7 100644 --- a/manifests/base/crds/kubeflow.org_pytorchjobs.yaml +++ b/manifests/base/crds/kubeflow.org_pytorchjobs.yaml @@ -7933,6 +7933,9 @@ spec: type: string queue: type: string + scheduleTimeoutSeconds: + format: int32 + type: integer type: object ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. @@ -8007,10 +8010,55 @@ spec: format: int32 type: integer labelSelector: - description: A label selector is a label query over a set of - resources. The result of matchLabels and matchExpressions - are ANDed. An empty label selector matches all objects. A - null label selector matches no objects. + description: 'Deprecated: Use Selector instead' + properties: + matchExpressions: + description: matchExpressions is a list of label selector + requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector + that contains values, a key, and an operator that relates + the key and values. + properties: + key: + description: key is the label key that the selector + applies to. + type: string + operator: + description: operator represents a key's relationship + to a set of values. Valid operators are In, NotIn, + Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. + If the operator is In or NotIn, the values array + must be non-empty. If the operator is Exists or + DoesNotExist, the values array must be empty. This + array is replaced during a strategic merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. + A single {key,value} in the matchLabels map is equivalent + to an element of matchExpressions, whose key field is + "key", the operator is "In", and the values array contains + only "value". The requirements are ANDed. + type: object + type: object + x-kubernetes-map-type: atomic + selector: + description: A Selector is a label query over a set of resources. + The result of matchLabels and matchExpressions are ANDed. + An empty Selector matches all objects. A null Selector matches + no objects. type: string succeeded: description: The number of pods which reached phase Succeeded. @@ -8036,7 +8084,7 @@ spec: storage: true subresources: scale: - labelSelectorPath: .status.replicaStatuses.Worker.labelSelector + labelSelectorPath: .status.replicaStatuses.Worker.selector specReplicasPath: .spec.pytorchReplicaSpecs.Worker.replicas statusReplicasPath: .status.replicaStatuses.Worker.active status: {} diff --git a/manifests/base/crds/kubeflow.org_tfjobs.yaml b/manifests/base/crds/kubeflow.org_tfjobs.yaml index 1074725642..a36d8b1734 100644 --- a/manifests/base/crds/kubeflow.org_tfjobs.yaml +++ b/manifests/base/crds/kubeflow.org_tfjobs.yaml @@ -86,6 +86,9 @@ spec: type: string queue: type: string + scheduleTimeoutSeconds: + format: int32 + type: integer type: object ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. @@ -7473,10 +7476,55 @@ spec: format: int32 type: integer labelSelector: - description: A label selector is a label query over a set of - resources. The result of matchLabels and matchExpressions - are ANDed. An empty label selector matches all objects. A - null label selector matches no objects. + description: 'Deprecated: Use Selector instead' + properties: + matchExpressions: + description: matchExpressions is a list of label selector + requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector + that contains values, a key, and an operator that relates + the key and values. + properties: + key: + description: key is the label key that the selector + applies to. + type: string + operator: + description: operator represents a key's relationship + to a set of values. Valid operators are In, NotIn, + Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. + If the operator is In or NotIn, the values array + must be non-empty. If the operator is Exists or + DoesNotExist, the values array must be empty. This + array is replaced during a strategic merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. + A single {key,value} in the matchLabels map is equivalent + to an element of matchExpressions, whose key field is + "key", the operator is "In", and the values array contains + only "value". The requirements are ANDed. + type: object + type: object + x-kubernetes-map-type: atomic + selector: + description: A Selector is a label query over a set of resources. + The result of matchLabels and matchExpressions are ANDed. + An empty Selector matches all objects. A null Selector matches + no objects. type: string succeeded: description: The number of pods which reached phase Succeeded. diff --git a/manifests/base/crds/kubeflow.org_xgboostjobs.yaml b/manifests/base/crds/kubeflow.org_xgboostjobs.yaml index 6f02824605..c10ee67f8e 100644 --- a/manifests/base/crds/kubeflow.org_xgboostjobs.yaml +++ b/manifests/base/crds/kubeflow.org_xgboostjobs.yaml @@ -82,6 +82,9 @@ spec: type: string queue: type: string + scheduleTimeoutSeconds: + format: int32 + type: integer type: object ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. @@ -7462,10 +7465,55 @@ spec: format: int32 type: integer labelSelector: - description: A label selector is a label query over a set of - resources. The result of matchLabels and matchExpressions - are ANDed. An empty label selector matches all objects. A - null label selector matches no objects. + description: 'Deprecated: Use Selector instead' + properties: + matchExpressions: + description: matchExpressions is a list of label selector + requirements. The requirements are ANDed. + items: + description: A label selector requirement is a selector + that contains values, a key, and an operator that relates + the key and values. + properties: + key: + description: key is the label key that the selector + applies to. + type: string + operator: + description: operator represents a key's relationship + to a set of values. Valid operators are In, NotIn, + Exists and DoesNotExist. + type: string + values: + description: values is an array of string values. + If the operator is In or NotIn, the values array + must be non-empty. If the operator is Exists or + DoesNotExist, the values array must be empty. This + array is replaced during a strategic merge patch. + items: + type: string + type: array + required: + - key + - operator + type: object + type: array + matchLabels: + additionalProperties: + type: string + description: matchLabels is a map of {key,value} pairs. + A single {key,value} in the matchLabels map is equivalent + to an element of matchExpressions, whose key field is + "key", the operator is "In", and the values array contains + only "value". The requirements are ANDed. + type: object + type: object + x-kubernetes-map-type: atomic + selector: + description: A Selector is a label query over a set of resources. + The result of matchLabels and matchExpressions are ANDed. + An empty Selector matches all objects. A null Selector matches + no objects. type: string succeeded: description: The number of pods which reached phase Succeeded. diff --git a/pkg/apis/kubeflow.org/v1/openapi_generated.go b/pkg/apis/kubeflow.org/v1/openapi_generated.go index be4f38d049..715eade7a4 100644 --- a/pkg/apis/kubeflow.org/v1/openapi_generated.go +++ b/pkg/apis/kubeflow.org/v1/openapi_generated.go @@ -247,7 +247,13 @@ func schema_pkg_apis_common_v1_ReplicaStatus(ref common.ReferenceCallback) commo }, "labelSelector": { SchemaProps: spec.SchemaProps{ - Description: "A label selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty label selector matches all objects. A null label selector matches no objects.", + Description: "Deprecated: Use Selector instead", + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector"), + }, + }, + "selector": { + SchemaProps: spec.SchemaProps{ + Description: "A Selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty Selector matches all objects. A null Selector matches no objects.", Type: []string{"string"}, Format: "", }, @@ -255,6 +261,8 @@ func schema_pkg_apis_common_v1_ReplicaStatus(ref common.ReferenceCallback) commo }, }, }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/apis/meta/v1.LabelSelector"}, } } @@ -346,6 +354,12 @@ func schema_pkg_apis_common_v1_SchedulingPolicy(ref common.ReferenceCallback) co Format: "", }, }, + "scheduleTimeoutSeconds": { + SchemaProps: spec.SchemaProps{ + Type: []string{"integer"}, + Format: "int32", + }, + }, }, }, }, diff --git a/pkg/apis/kubeflow.org/v1/paddlepaddle_types.go b/pkg/apis/kubeflow.org/v1/paddlepaddle_types.go index 67c11cc17e..5137cdb95c 100644 --- a/pkg/apis/kubeflow.org/v1/paddlepaddle_types.go +++ b/pkg/apis/kubeflow.org/v1/paddlepaddle_types.go @@ -51,7 +51,7 @@ const ( //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.conditions[-1:].type` //+kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp` -// +kubebuilder:subresource:scale:specpath=.spec.paddleReplicaSpecs.Worker.replicas,statuspath=.status.replicaStatuses.Worker.active,selectorpath=.status.replicaStatuses.Worker.labelSelector +// +kubebuilder:subresource:scale:specpath=.spec.paddleReplicaSpecs.Worker.replicas,statuspath=.status.replicaStatuses.Worker.active,selectorpath=.status.replicaStatuses.Worker.selector // PaddleJob Represents a PaddleJob resource. type PaddleJob struct { diff --git a/pkg/apis/kubeflow.org/v1/pytorch_types.go b/pkg/apis/kubeflow.org/v1/pytorch_types.go index 7a030bcf8d..bb04aea6e5 100644 --- a/pkg/apis/kubeflow.org/v1/pytorch_types.go +++ b/pkg/apis/kubeflow.org/v1/pytorch_types.go @@ -51,7 +51,7 @@ const ( //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="State",type=string,JSONPath=`.status.conditions[-1:].type` //+kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp` -// +kubebuilder:subresource:scale:specpath=.spec.pytorchReplicaSpecs.Worker.replicas,statuspath=.status.replicaStatuses.Worker.active,selectorpath=.status.replicaStatuses.Worker.labelSelector +// +kubebuilder:subresource:scale:specpath=.spec.pytorchReplicaSpecs.Worker.replicas,statuspath=.status.replicaStatuses.Worker.active,selectorpath=.status.replicaStatuses.Worker.selector // PyTorchJob Represents a PyTorchJob resource. type PyTorchJob struct { diff --git a/pkg/common/util/scheduler.go b/pkg/common/util/scheduler.go index c8863fe714..754d18b810 100644 --- a/pkg/common/util/scheduler.go +++ b/pkg/common/util/scheduler.go @@ -16,21 +16,12 @@ package util import commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" -const ( - DefaultGangSchedulerName = "volcano" -) - func IsGangSchedulerSet(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, schedulerName string) bool { - if len(schedulerName) == 0 { - schedulerName = DefaultGangSchedulerName - } - for _, spec := range replicas { if spec.Template.Spec.SchedulerName != "" && spec.Template.Spec.SchedulerName == schedulerName { return true } } - return false } diff --git a/pkg/controller.v1/mpi/mpijob.go b/pkg/controller.v1/mpi/mpijob.go index 47fe863140..36dae7a1d8 100644 --- a/pkg/controller.v1/mpi/mpijob.go +++ b/pkg/controller.v1/mpi/mpijob.go @@ -18,11 +18,11 @@ import ( "strings" commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" + kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" ) const ( @@ -69,14 +69,9 @@ const ( // policy is set in pod template. podTemplateRestartPolicyReason = "SettedPodTemplateRestartPolicy" - // gang scheduler name. - gangSchedulerName = "volcano" - // podTemplateSchedulerNameReason is the warning reason when other scheduler name is set // in pod templates with gang-scheduling enabled podTemplateSchedulerNameReason = "SettedPodTemplateSchedulerName" - // gangSchedulingPodGroupAnnotation is the annotation key used by batch schedulers - gangSchedulingPodGroupAnnotation = "scheduling.k8s.io/group-name" // volcanoTaskSpecKey task spec key used in pod annotation when EnableGangScheduling is true volcanoTaskSpecKey = "volcano.sh/task-spec" diff --git a/pkg/controller.v1/mpi/mpijob_controller.go b/pkg/controller.v1/mpi/mpijob_controller.go index be2a60acbe..fabb3e46da 100644 --- a/pkg/controller.v1/mpi/mpijob_controller.go +++ b/pkg/controller.v1/mpi/mpijob_controller.go @@ -25,12 +25,6 @@ import ( "time" "github.com/go-logr/logr" - commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" - "github.com/kubeflow/common/pkg/controller.v1/common" - "github.com/kubeflow/common/pkg/controller.v1/control" - "github.com/kubeflow/common/pkg/controller.v1/expectation" - commonutil "github.com/kubeflow/common/pkg/util" - "github.com/kubeflow/training-operator/pkg/common/util" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -53,11 +47,17 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" + schedulerpluginsv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" "volcano.sh/apis/pkg/apis/scheduling/v1beta1" - volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" + commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" + "github.com/kubeflow/common/pkg/controller.v1/common" + "github.com/kubeflow/common/pkg/controller.v1/control" + "github.com/kubeflow/common/pkg/controller.v1/expectation" + commonutil "github.com/kubeflow/common/pkg/util" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" trainingoperatorcommon "github.com/kubeflow/training-operator/pkg/common" + "github.com/kubeflow/training-operator/pkg/common/util" ctlrconfig "github.com/kubeflow/training-operator/pkg/config" ) @@ -69,7 +69,7 @@ const ( labelMPIJobName = "mpi-job-name" ) -func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *MPIJobReconciler { +func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc) *MPIJobReconciler { r := &MPIJobReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -80,23 +80,23 @@ func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *MPIJobReconc cfg := mgr.GetConfig() kubeClientSet := kubeclientset.NewForConfigOrDie(cfg) - volcanoClientSet := volcanoclient.NewForConfigOrDie(cfg) sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0) - priorityClassInformer := sharedInformers.Scheduling().V1beta1().PriorityClasses() + priorityClassInformer := sharedInformers.Scheduling().V1().PriorityClasses() r.JobController = common.JobController{ Controller: r, Expectations: expectation.NewControllerExpectations(), - Config: common.JobControllerConfiguration{EnableGangScheduling: enableGangScheduling}, WorkQueue: &util.FakeWorkQueue{}, Recorder: r.recorder, KubeClientSet: kubeClientSet, - VolcanoClientSet: volcanoClientSet, PriorityClassLister: priorityClassInformer.Lister(), PriorityClassInformerSynced: priorityClassInformer.Informer().HasSynced, PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.recorder}, ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.recorder}, } + + gangSchedulingSetupFunc(&r.JobController) + return r } @@ -244,11 +244,11 @@ func (jc *MPIJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads return err } - // skip watching podgroup if PodGroup is not installed + // skip watching volcano PodGroup if volcano PodGroup is not installed _, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: v1beta1.SchemeGroupVersion.Group, Kind: "PodGroup"}, v1beta1.SchemeGroupVersion.Version) if err == nil { - // inject watching for job related PodGroup + // inject watching for job related volcano PodGroup if err = c.Watch(&source.Kind{Type: &v1beta1.PodGroup{}}, &handler.EnqueueRequestForOwner{ IsController: true, OwnerType: &kubeflowv1.MPIJob{}, @@ -257,6 +257,20 @@ func (jc *MPIJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads } } + // skip watching scheduler-plugins PodGroup if scheduler-plugins PodGroup is not installed + _, err = mgr.GetRESTMapper().RESTMapping( + schema.GroupKind{Group: schedulerpluginsv1alpha1.SchemeGroupVersion.Group, Kind: "PodGroup"}, + schedulerpluginsv1alpha1.SchemeGroupVersion.Version) + if err == nil { + // inject watching for job related scheduler-plugins PodGroup + if err = c.Watch(&source.Kind{Type: &schedulerpluginsv1alpha1.PodGroup{}}, &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &kubeflowv1.MPIJob{}, + }, predicates); err != nil { + return err + } + } + return nil } @@ -1000,20 +1014,18 @@ func (jc *MPIJobReconciler) newWorker(mpiJob *kubeflowv1.MPIJob, name string) *c // if gang-scheduling is enabled: // 1. if user has specified other scheduler, we report a warning without overriding any fields. // 2. if no SchedulerName is set for pods, then we set the SchedulerName to "volcano". - if jc.Config.EnableGangScheduling { - if util.IsGangSchedulerSet(mpiJob.Spec.MPIReplicaSpecs, gangSchedulerName) { + if jc.Config.EnableGangScheduling() { + if !util.IsGangSchedulerSet(mpiJob.Spec.MPIReplicaSpecs, jc.PodGroupControl.GetSchedulerName()) { errMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten" logger.Warning(errMsg) jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg) - } else { - podSpec.Spec.SchedulerName = gangSchedulerName } - if podSpec.Annotations == nil { - podSpec.Annotations = map[string]string{} + rtWorker := strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeWorker)) + jc.PodGroupControl.DecoratePodTemplateSpec(podSpec, mpiJob, rtWorker) + if jc.PodGroupControl.GetSchedulerName() == "volcano" { + podSpec.Annotations[volcanoTaskSpecKey] = rtWorker } - podSpec.Annotations[gangSchedulingPodGroupAnnotation] = mpiJob.GetName() - podSpec.Annotations[volcanoTaskSpecKey] = strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeWorker)) } return &corev1.Pod{ @@ -1054,20 +1066,18 @@ func (jc *MPIJobReconciler) newLauncher(mpiJob *kubeflowv1.MPIJob, kubectlDelive logger := commonutil.LoggerForReplica(mpiJob, strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher))) // add SchedulerName to podSpec - if jc.Config.EnableGangScheduling { - if util.IsGangSchedulerSet(mpiJob.Spec.MPIReplicaSpecs, gangSchedulerName) { + if jc.Config.EnableGangScheduling() { + if !util.IsGangSchedulerSet(mpiJob.Spec.MPIReplicaSpecs, jc.PodGroupControl.GetSchedulerName()) { errMsg := "Another scheduler is specified when gang-scheduling is enabled and it will not be overwritten" logger.Warning(errMsg) jc.Recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg) - } else { - podSpec.Spec.SchedulerName = gangSchedulerName } - if podSpec.Annotations == nil { - podSpec.Annotations = map[string]string{} + rt := strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher)) + jc.PodGroupControl.DecoratePodTemplateSpec(podSpec, mpiJob, rt) + if jc.PodGroupControl.GetSchedulerName() == "volcano" { + podSpec.Annotations[volcanoTaskSpecKey] = rt } - podSpec.Annotations[gangSchedulingPodGroupAnnotation] = mpiJob.GetName() - podSpec.Annotations[volcanoTaskSpecKey] = strings.ToLower(string(kubeflowv1.MPIJobReplicaTypeLauncher)) } podSpec.Spec.ServiceAccountName = launcherName diff --git a/pkg/controller.v1/mpi/suite_test.go b/pkg/controller.v1/mpi/suite_test.go index 1f25f00af0..01cd941e43 100644 --- a/pkg/controller.v1/mpi/suite_test.go +++ b/pkg/controller.v1/mpi/suite_test.go @@ -19,19 +19,19 @@ import ( "path/filepath" "testing" + "github.com/kubeflow/common/pkg/controller.v1/common" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/kubeflow/training-operator/pkg/config" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "k8s.io/client-go/kubernetes/scheme" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" - v1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + "volcano.sh/apis/pkg/apis/scheduling/v1beta1" //+kubebuilder:scaffold:imports ) @@ -86,7 +86,8 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred()) - reconciler = NewReconciler(mgr, false) + gangSchedulingSetupFunc := common.GenNonGangSchedulerSetupFunc() + reconciler = NewReconciler(mgr, gangSchedulingSetupFunc) Expect(reconciler.SetupWithManager(mgr, 1)).NotTo(HaveOccurred()) go func() { diff --git a/pkg/controller.v1/mxnet/mxjob_controller.go b/pkg/controller.v1/mxnet/mxjob_controller.go index 84cb1c6e2f..3a0e4e9171 100644 --- a/pkg/controller.v1/mxnet/mxjob_controller.go +++ b/pkg/controller.v1/mxnet/mxjob_controller.go @@ -20,12 +20,16 @@ import ( "reflect" "time" - "github.com/go-logr/logr" commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" "github.com/kubeflow/common/pkg/controller.v1/common" "github.com/kubeflow/common/pkg/controller.v1/control" "github.com/kubeflow/common/pkg/controller.v1/expectation" commonutil "github.com/kubeflow/common/pkg/util" + kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + trainingoperatorcommon "github.com/kubeflow/training-operator/pkg/common" + "github.com/kubeflow/training-operator/pkg/common/util" + + "github.com/go-logr/logr" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -47,12 +51,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" + schedulerpluginsv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" "volcano.sh/apis/pkg/apis/scheduling/v1beta1" - volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - trainingoperatorcommon "github.com/kubeflow/training-operator/pkg/common" - "github.com/kubeflow/training-operator/pkg/common/util" ) const ( @@ -69,7 +69,7 @@ const ( ) // NewReconciler creates a MXJob Reconciler -func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *MXJobReconciler { +func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc) *MXJobReconciler { r := &MXJobReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -81,25 +81,24 @@ func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *MXJobReconci // Create clients. cfg := mgr.GetConfig() kubeClientSet := kubeclientset.NewForConfigOrDie(cfg) - volcanoClientSet := volcanoclient.NewForConfigOrDie(cfg) sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0) - priorityClassInformer := sharedInformers.Scheduling().V1beta1().PriorityClasses() + priorityClassInformer := sharedInformers.Scheduling().V1().PriorityClasses() // Initialize common job controller r.JobController = common.JobController{ Controller: r, Expectations: expectation.NewControllerExpectations(), - Config: common.JobControllerConfiguration{EnableGangScheduling: enableGangScheduling}, WorkQueue: &util.FakeWorkQueue{}, Recorder: r.Recorder, KubeClientSet: kubeClientSet, - VolcanoClientSet: volcanoClientSet, PriorityClassLister: priorityClassInformer.Lister(), PriorityClassInformerSynced: priorityClassInformer.Informer().HasSynced, PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.Recorder}, ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.Recorder}, } + gangSchedulingSetupFunc(&r.JobController) + return r } @@ -218,11 +217,11 @@ func (r *MXJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads i return err } - // skip watching podgroup if podgroup is not installed + // skip watching volcano PodGroup if volcano PodGroup is not installed _, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: v1beta1.SchemeGroupVersion.Group, Kind: "PodGroup"}, v1beta1.SchemeGroupVersion.Version) if err == nil { - // inject watching for job related podgroup + // inject watching for job related volcano PodGroup if err = c.Watch(&source.Kind{Type: &v1beta1.PodGroup{}}, &handler.EnqueueRequestForOwner{ IsController: true, OwnerType: &kubeflowv1.MXJob{}, @@ -235,6 +234,24 @@ func (r *MXJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads i } } + // skip watching scheduler-plugins PodGroup if scheduler-plugins PodGroup is not installed + _, err = mgr.GetRESTMapper().RESTMapping( + schema.GroupKind{Group: schedulerpluginsv1alpha1.SchemeGroupVersion.Group, Kind: "PodGroup"}, + schedulerpluginsv1alpha1.SchemeGroupVersion.Version) + if err == nil { + // inject watching for job related scheduler-plugins PodGroup + if err = c.Watch(&source.Kind{Type: &schedulerpluginsv1alpha1.PodGroup{}}, &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &kubeflowv1.MXJob{}, + }, predicate.Funcs{ + CreateFunc: util.OnDependentCreateFuncGeneric(r.Expectations), + UpdateFunc: util.OnDependentUpdateFuncGeneric(&r.JobController), + DeleteFunc: util.OnDependentDeleteFuncGeneric(r.Expectations), + }); err != nil { + return err + } + } + return nil } diff --git a/pkg/controller.v1/mxnet/mxnet.go b/pkg/controller.v1/mxnet/mxnet.go index 91b26119c6..26e4240388 100644 --- a/pkg/controller.v1/mxnet/mxnet.go +++ b/pkg/controller.v1/mxnet/mxnet.go @@ -22,9 +22,9 @@ import ( commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" "github.com/kubeflow/common/pkg/controller.v1/common" - corev1 "k8s.io/api/core/v1" - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + + corev1 "k8s.io/api/core/v1" ) const ( diff --git a/pkg/controller.v1/mxnet/suite_test.go b/pkg/controller.v1/mxnet/suite_test.go index e78479fe7d..df09b331c3 100644 --- a/pkg/controller.v1/mxnet/suite_test.go +++ b/pkg/controller.v1/mxnet/suite_test.go @@ -27,7 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" - v1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + "volcano.sh/apis/pkg/apis/scheduling/v1beta1" //+kubebuilder:scaffold:imports ) diff --git a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go index ef0a4ad3d8..ecea63a31d 100644 --- a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go +++ b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go @@ -20,12 +20,16 @@ import ( "strings" "time" - "github.com/go-logr/logr" commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" "github.com/kubeflow/common/pkg/controller.v1/common" "github.com/kubeflow/common/pkg/controller.v1/control" "github.com/kubeflow/common/pkg/controller.v1/expectation" commonutil "github.com/kubeflow/common/pkg/util" + kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + trainingoperatorcommon "github.com/kubeflow/training-operator/pkg/common" + "github.com/kubeflow/training-operator/pkg/common/util" + + "github.com/go-logr/logr" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -48,12 +52,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" + schedulerpluginsv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" "volcano.sh/apis/pkg/apis/scheduling/v1beta1" - volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - trainingoperatorcommon "github.com/kubeflow/training-operator/pkg/common" - "github.com/kubeflow/training-operator/pkg/common/util" ) const ( @@ -61,7 +61,7 @@ const ( ) // NewReconciler creates a PaddleJob Reconciler -func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *PaddleJobReconciler { +func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc) *PaddleJobReconciler { r := &PaddleJobReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -73,25 +73,24 @@ func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *PaddleJobRec // Create clients cfg := mgr.GetConfig() kubeClientSet := kubeclientset.NewForConfigOrDie(cfg) - volcanoClientSet := volcanoclient.NewForConfigOrDie(cfg) sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0) - priorityClassInformer := sharedInformers.Scheduling().V1beta1().PriorityClasses() + priorityClassInformer := sharedInformers.Scheduling().V1().PriorityClasses() // Initialize common job controller r.JobController = common.JobController{ Controller: r, Expectations: expectation.NewControllerExpectations(), - Config: common.JobControllerConfiguration{EnableGangScheduling: enableGangScheduling}, WorkQueue: &util.FakeWorkQueue{}, Recorder: r.recorder, KubeClientSet: kubeClientSet, - VolcanoClientSet: volcanoClientSet, PriorityClassLister: priorityClassInformer.Lister(), PriorityClassInformerSynced: priorityClassInformer.Informer().HasSynced, PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.recorder}, ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.recorder}, } + gangSchedulingSetupFunc(&r.JobController) + return r } @@ -213,11 +212,11 @@ func (r *PaddleJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThrea return err } - // skip watching podgroup if podgroup is not installed + // skip watching volcano PodGroup if volcano PodGroup is not installed _, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: v1beta1.SchemeGroupVersion.Group, Kind: "PodGroup"}, v1beta1.SchemeGroupVersion.Version) if err == nil { - // inject watching for job related podgroup + // inject watching for job related volcano PodGroup if err = c.Watch(&source.Kind{Type: &v1beta1.PodGroup{}}, &handler.EnqueueRequestForOwner{ IsController: true, OwnerType: &kubeflowv1.PaddleJob{}, @@ -230,6 +229,24 @@ func (r *PaddleJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThrea } } + // skip watching scheduler-plugins PodGroup if scheduler-plugins PodGroup is not installed + _, err = mgr.GetRESTMapper().RESTMapping( + schema.GroupKind{Group: schedulerpluginsv1alpha1.SchemeGroupVersion.Group, Kind: "PodGroup"}, + schedulerpluginsv1alpha1.SchemeGroupVersion.Version) + if err == nil { + // inject watching for job related scheduler-plugins PodGroup + if err = c.Watch(&source.Kind{Type: &schedulerpluginsv1alpha1.PodGroup{}}, &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &kubeflowv1.PaddleJob{}, + }, predicate.Funcs{ + CreateFunc: util.OnDependentCreateFuncGeneric(r.Expectations), + UpdateFunc: util.OnDependentUpdateFuncGeneric(&r.JobController), + DeleteFunc: util.OnDependentDeleteFuncGeneric(r.Expectations), + }); err != nil { + return err + } + } + return nil } @@ -370,7 +387,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, for rtype, spec := range replicas { status := jobStatus.ReplicaStatuses[rtype] // Generate the label selector. - status.LabelSelector = metav1.FormatLabelSelector(r.GenLabelSelector(paddlejob.Name, rtype)) + status.Selector = metav1.FormatLabelSelector(r.GenLabelSelector(paddlejob.Name, rtype)) succeeded := status.Succeeded expected := *(spec.Replicas) - succeeded diff --git a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_suite_test.go b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_suite_test.go index 6c21b028b7..aebb63e3c9 100644 --- a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_suite_test.go +++ b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_suite_test.go @@ -19,6 +19,7 @@ import ( "path/filepath" "testing" + "github.com/kubeflow/common/pkg/controller.v1/common" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" . "github.com/onsi/ginkgo/v2" @@ -30,7 +31,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" - v1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + "volcano.sh/apis/pkg/apis/scheduling/v1beta1" //+kubebuilder:scaffold:imports ) @@ -81,7 +82,8 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(gomega.HaveOccurred()) - r := NewReconciler(mgr, false) + gangSchedulingSetupFunc := common.GenNonGangSchedulerSetupFunc() + r := NewReconciler(mgr, gangSchedulingSetupFunc) Expect(r.SetupWithManager(mgr, 1)).NotTo(gomega.HaveOccurred()) diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller.go b/pkg/controller.v1/pytorch/pytorchjob_controller.go index a19de42aca..ca664e4173 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller.go @@ -20,12 +20,16 @@ import ( "strings" "time" - "github.com/go-logr/logr" commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" "github.com/kubeflow/common/pkg/controller.v1/common" "github.com/kubeflow/common/pkg/controller.v1/control" "github.com/kubeflow/common/pkg/controller.v1/expectation" commonutil "github.com/kubeflow/common/pkg/util" + kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + trainingoperatorcommon "github.com/kubeflow/training-operator/pkg/common" + "github.com/kubeflow/training-operator/pkg/common/util" + + "github.com/go-logr/logr" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -48,12 +52,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" + schedulerpluginsv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" "volcano.sh/apis/pkg/apis/scheduling/v1beta1" - volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - trainingoperatorcommon "github.com/kubeflow/training-operator/pkg/common" - "github.com/kubeflow/training-operator/pkg/common/util" ) const ( @@ -61,7 +61,7 @@ const ( ) // NewReconciler creates a PyTorchJob Reconciler -func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *PyTorchJobReconciler { +func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc) *PyTorchJobReconciler { r := &PyTorchJobReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -73,25 +73,24 @@ func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *PyTorchJobRe // Create clients cfg := mgr.GetConfig() kubeClientSet := kubeclientset.NewForConfigOrDie(cfg) - volcanoClientSet := volcanoclient.NewForConfigOrDie(cfg) sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0) - priorityClassInformer := sharedInformers.Scheduling().V1beta1().PriorityClasses() + priorityClassInformer := sharedInformers.Scheduling().V1().PriorityClasses() // Initialize common job controller r.JobController = common.JobController{ Controller: r, Expectations: expectation.NewControllerExpectations(), - Config: common.JobControllerConfiguration{EnableGangScheduling: enableGangScheduling}, WorkQueue: &util.FakeWorkQueue{}, Recorder: r.recorder, KubeClientSet: kubeClientSet, - VolcanoClientSet: volcanoClientSet, PriorityClassLister: priorityClassInformer.Lister(), PriorityClassInformerSynced: priorityClassInformer.Informer().HasSynced, PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.recorder}, ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.recorder}, } + gangSchedulingSetupFunc(&r.JobController) + return r } @@ -216,11 +215,12 @@ func (r *PyTorchJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThre }); err != nil { return err } - // skip watching podgroup if podgroup is not installed + + // skip watching volcano PodGroup if volcano PodGroup is not installed _, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: v1beta1.SchemeGroupVersion.Group, Kind: "PodGroup"}, v1beta1.SchemeGroupVersion.Version) if err == nil { - // inject watching for job related podgroup + // inject watching for job related volcano PodGroup if err = c.Watch(&source.Kind{Type: &v1beta1.PodGroup{}}, &handler.EnqueueRequestForOwner{ IsController: true, OwnerType: &kubeflowv1.PyTorchJob{}, @@ -233,6 +233,24 @@ func (r *PyTorchJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThre } } + // skip watching scheduler-plugins PodGroup if scheduler-plugins PodGroup is not installed + _, err = mgr.GetRESTMapper().RESTMapping( + schema.GroupKind{Group: schedulerpluginsv1alpha1.SchemeGroupVersion.Group, Kind: "PodGroup"}, + schedulerpluginsv1alpha1.SchemeGroupVersion.Version) + if err == nil { + // inject watching for job related scheduler-plugins PodGroup + if err = c.Watch(&source.Kind{Type: &schedulerpluginsv1alpha1.PodGroup{}}, &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &kubeflowv1.PyTorchJob{}, + }, predicate.Funcs{ + CreateFunc: util.OnDependentCreateFuncGeneric(r.Expectations), + UpdateFunc: util.OnDependentUpdateFuncGeneric(&r.JobController), + DeleteFunc: util.OnDependentDeleteFuncGeneric(r.Expectations), + }); err != nil { + return err + } + } + return nil } @@ -372,7 +390,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, for rtype, spec := range replicas { status := jobStatus.ReplicaStatuses[rtype] // Generate the label selector. - status.LabelSelector = metav1.FormatLabelSelector(r.GenLabelSelector(pytorchjob.Name, rtype)) + status.Selector = metav1.FormatLabelSelector(r.GenLabelSelector(pytorchjob.Name, rtype)) succeeded := status.Succeeded expected := *(spec.Replicas) - succeeded diff --git a/pkg/controller.v1/pytorch/pytorchjob_controller_suite_test.go b/pkg/controller.v1/pytorch/pytorchjob_controller_suite_test.go index 6111d04e55..301f869cde 100644 --- a/pkg/controller.v1/pytorch/pytorchjob_controller_suite_test.go +++ b/pkg/controller.v1/pytorch/pytorchjob_controller_suite_test.go @@ -19,6 +19,7 @@ import ( "path/filepath" "testing" + "github.com/kubeflow/common/pkg/controller.v1/common" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/kubeflow/training-operator/pkg/config" @@ -31,7 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" - v1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + "volcano.sh/apis/pkg/apis/scheduling/v1beta1" //+kubebuilder:scaffold:imports ) @@ -86,7 +87,8 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(gomega.HaveOccurred()) - r := NewReconciler(mgr, false) + gangSchedulingSetupFunc := common.GenNonGangSchedulerSetupFunc() + r := NewReconciler(mgr, gangSchedulingSetupFunc) Expect(r.SetupWithManager(mgr, 1)).NotTo(gomega.HaveOccurred()) diff --git a/pkg/controller.v1/register_controller.go b/pkg/controller.v1/register_controller.go index bbdd0af108..2d8e446367 100644 --- a/pkg/controller.v1/register_controller.go +++ b/pkg/controller.v1/register_controller.go @@ -18,8 +18,7 @@ import ( "fmt" "strings" - "sigs.k8s.io/controller-runtime/pkg/manager" - + "github.com/kubeflow/common/pkg/controller.v1/common" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" mpicontroller "github.com/kubeflow/training-operator/pkg/controller.v1/mpi" mxnetcontroller "github.com/kubeflow/training-operator/pkg/controller.v1/mxnet" @@ -27,30 +26,32 @@ import ( pytorchcontroller "github.com/kubeflow/training-operator/pkg/controller.v1/pytorch" tensorflowcontroller "github.com/kubeflow/training-operator/pkg/controller.v1/tensorflow" xgboostcontroller "github.com/kubeflow/training-operator/pkg/controller.v1/xgboost" + + "sigs.k8s.io/controller-runtime/pkg/manager" ) const ErrTemplateSchemeNotSupported = "scheme %s is not supported yet" -type ReconcilerSetupFunc func(manager manager.Manager, enableGangScheduling bool, controllerThreads int) error +type ReconcilerSetupFunc func(manager manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc, controllerThreads int) error var SupportedSchemeReconciler = map[string]ReconcilerSetupFunc{ - kubeflowv1.TFJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error { - return tensorflowcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads) + kubeflowv1.TFJobKind: func(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc, controllerThreads int) error { + return tensorflowcontroller.NewReconciler(mgr, gangSchedulingSetupFunc).SetupWithManager(mgr, controllerThreads) }, - kubeflowv1.PytorchJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error { - return pytorchcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads) + kubeflowv1.PytorchJobKind: func(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc, controllerThreads int) error { + return pytorchcontroller.NewReconciler(mgr, gangSchedulingSetupFunc).SetupWithManager(mgr, controllerThreads) }, - kubeflowv1.MXJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error { - return mxnetcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads) + kubeflowv1.MXJobKind: func(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc, controllerThreads int) error { + return mxnetcontroller.NewReconciler(mgr, gangSchedulingSetupFunc).SetupWithManager(mgr, controllerThreads) }, - kubeflowv1.XGBoostJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error { - return xgboostcontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads) + kubeflowv1.XGBoostJobKind: func(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc, controllerThreads int) error { + return xgboostcontroller.NewReconciler(mgr, gangSchedulingSetupFunc).SetupWithManager(mgr, controllerThreads) }, - kubeflowv1.MPIJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error { - return mpicontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads) + kubeflowv1.MPIJobKind: func(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc, controllerThreads int) error { + return mpicontroller.NewReconciler(mgr, gangSchedulingSetupFunc).SetupWithManager(mgr, controllerThreads) }, - kubeflowv1.PaddleJobKind: func(mgr manager.Manager, enableGangScheduling bool, controllerThreads int) error { - return paddlecontroller.NewReconciler(mgr, enableGangScheduling).SetupWithManager(mgr, controllerThreads) + kubeflowv1.PaddleJobKind: func(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc, controllerThreads int) error { + return paddlecontroller.NewReconciler(mgr, gangSchedulingSetupFunc).SetupWithManager(mgr, controllerThreads) }, } diff --git a/pkg/controller.v1/tensorflow/suite_test.go b/pkg/controller.v1/tensorflow/suite_test.go index e5a7cc9c3e..e7b69e3446 100644 --- a/pkg/controller.v1/tensorflow/suite_test.go +++ b/pkg/controller.v1/tensorflow/suite_test.go @@ -21,6 +21,9 @@ import ( "testing" "time" + "github.com/kubeflow/common/pkg/controller.v1/common" + kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" @@ -30,9 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" - v1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + "volcano.sh/apis/pkg/apis/scheduling/v1beta1" //+kubebuilder:scaffold:imports ) @@ -88,7 +89,8 @@ var _ = BeforeSuite(func() { }) Expect(err).NotTo(HaveOccurred()) - reconciler = NewReconciler(mgr, false) + gangSchedulingSetupFunc := common.GenNonGangSchedulerSetupFunc() + reconciler = NewReconciler(mgr, gangSchedulingSetupFunc) Expect(reconciler.SetupWithManager(mgr, 1)).NotTo(HaveOccurred()) go func() { diff --git a/pkg/controller.v1/tensorflow/tfjob_controller.go b/pkg/controller.v1/tensorflow/tfjob_controller.go index d985179c5a..6011dab5ce 100644 --- a/pkg/controller.v1/tensorflow/tfjob_controller.go +++ b/pkg/controller.v1/tensorflow/tfjob_controller.go @@ -21,13 +21,17 @@ import ( "strings" "time" - "github.com/go-logr/logr" commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" "github.com/kubeflow/common/pkg/controller.v1/common" "github.com/kubeflow/common/pkg/controller.v1/control" "github.com/kubeflow/common/pkg/controller.v1/expectation" commonutil "github.com/kubeflow/common/pkg/util" train_util "github.com/kubeflow/common/pkg/util/train" + kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + trainingoperatorcommon "github.com/kubeflow/training-operator/pkg/common" + "github.com/kubeflow/training-operator/pkg/common/util" + + "github.com/go-logr/logr" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" @@ -49,12 +53,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" + schedulerpluginsv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" "volcano.sh/apis/pkg/apis/scheduling/v1beta1" - volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - trainingoperatorcommon "github.com/kubeflow/training-operator/pkg/common" - "github.com/kubeflow/training-operator/pkg/common/util" ) const ( @@ -75,8 +75,6 @@ const ( // volcanoTaskSpecKey task spec key used in pod annotation when EnableGangScheduling is true volcanoTaskSpecKey = "volcano.sh/task-spec" - // gang scheduler name. - gangSchedulerName = "volcano" // tfConfig is the environment variable name of TensorFlow cluster spec. tfConfig = "TF_CONFIG" // exitedWithCodeReason is the normal reason when the pod is exited because of the exit code. @@ -87,11 +85,9 @@ const ( // podTemplateSchedulerNameReason is the warning reason when other scheduler name is set // in pod templates with gang-scheduling enabled podTemplateSchedulerNameReason = "SettedPodTemplateSchedulerName" - // gangSchedulingPodGroupAnnotation is the annotation key used by batch schedulers - gangSchedulingPodGroupAnnotation = "scheduling.k8s.io/group-name" ) -func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *TFJobReconciler { +func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc) *TFJobReconciler { r := &TFJobReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -102,24 +98,23 @@ func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *TFJobReconci cfg := mgr.GetConfig() kubeClientSet := kubeclientset.NewForConfigOrDie(cfg) - volcanoClientSet := volcanoclient.NewForConfigOrDie(cfg) sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0) - priorityClassInformer := sharedInformers.Scheduling().V1beta1().PriorityClasses() + priorityClassInformer := sharedInformers.Scheduling().V1().PriorityClasses() r.JobController = common.JobController{ Controller: r, Expectations: expectation.NewControllerExpectations(), - Config: common.JobControllerConfiguration{EnableGangScheduling: enableGangScheduling}, WorkQueue: &util.FakeWorkQueue{}, Recorder: r.recorder, KubeClientSet: kubeClientSet, - VolcanoClientSet: volcanoClientSet, PriorityClassLister: priorityClassInformer.Lister(), PriorityClassInformerSynced: priorityClassInformer.Informer().HasSynced, PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.recorder}, ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.recorder}, } + gangSchedulingSetupFunc(&r.JobController) + return r } @@ -234,11 +229,12 @@ func (r *TFJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads i }); err != nil { return err } - // skip watching podgroup if podgroup is not installed + + // skip watching volcano PodGroup if volcano PodGroup is not installed _, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: v1beta1.SchemeGroupVersion.Group, Kind: "PodGroup"}, v1beta1.SchemeGroupVersion.Version) if err == nil { - // inject watching for job related podgroup + // inject watching for job related volcano PodGroup if err = c.Watch(&source.Kind{Type: &v1beta1.PodGroup{}}, &handler.EnqueueRequestForOwner{ IsController: true, OwnerType: &kubeflowv1.TFJob{}, @@ -251,6 +247,24 @@ func (r *TFJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads i } } + // skip watching scheduler-plugins PodGroup if scheduler-plugins PodGroup is not installed + _, err = mgr.GetRESTMapper().RESTMapping( + schema.GroupKind{Group: schedulerpluginsv1alpha1.SchemeGroupVersion.Group, Kind: "PodGroup"}, + schedulerpluginsv1alpha1.SchemeGroupVersion.Version) + if err == nil { + // inject watching for job related scheduler-plugins PodGroup + if err = c.Watch(&source.Kind{Type: &schedulerpluginsv1alpha1.PodGroup{}}, &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &kubeflowv1.TFJob{}, + }, predicate.Funcs{ + CreateFunc: util.OnDependentCreateFuncGeneric(r.Expectations), + UpdateFunc: util.OnDependentUpdateFuncGeneric(&r.JobController), + DeleteFunc: util.OnDependentDeleteFuncGeneric(r.Expectations), + }); err != nil { + return err + } + } + return nil } @@ -868,8 +882,10 @@ func (r *TFJobReconciler) createNewPod(tfjob *kubeflowv1.TFJob, rt, index string // if gang-scheduling is enabled: // 1. if user has specified other scheduler, we report a warning without overriding any fields. // 2. if no SchedulerName is set for pods, then we set the SchedulerName to "volcano". - if r.Config.EnableGangScheduling { + if r.Config.EnableGangScheduling() { podSchedulerName := util.GetSchedulerName(replicas) + gangSchedulerName := r.PodGroupControl.GetSchedulerName() + if len(podSchedulerName) == 0 { podTemplate.Spec.SchedulerName = gangSchedulerName } else if strings.Compare(podSchedulerName, gangSchedulerName) != 0 { @@ -878,11 +894,10 @@ func (r *TFJobReconciler) createNewPod(tfjob *kubeflowv1.TFJob, rt, index string r.Recorder.Event(tfjob, v1.EventTypeWarning, podTemplateSchedulerNameReason, errMsg) } - if podTemplate.Annotations == nil { - podTemplate.Annotations = map[string]string{} + r.PodGroupControl.DecoratePodTemplateSpec(podTemplate, tfjob, rt) + if gangSchedulerName == "volcano" { + podTemplate.Annotations[volcanoTaskSpecKey] = rt } - podTemplate.Annotations[gangSchedulingPodGroupAnnotation] = tfjob.GetName() - podTemplate.Annotations[volcanoTaskSpecKey] = rt } err = r.PodControl.CreatePodsWithControllerRef(tfjob.Namespace, podTemplate, tfjob, controllerRef) diff --git a/pkg/controller.v1/xgboost/xgboostjob_controller.go b/pkg/controller.v1/xgboost/xgboostjob_controller.go index 26680e018d..7e6f2628d0 100644 --- a/pkg/controller.v1/xgboost/xgboostjob_controller.go +++ b/pkg/controller.v1/xgboost/xgboostjob_controller.go @@ -20,13 +20,17 @@ import ( "reflect" "time" - "github.com/go-logr/logr" commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" "github.com/kubeflow/common/pkg/controller.v1/common" "github.com/kubeflow/common/pkg/controller.v1/control" "github.com/kubeflow/common/pkg/controller.v1/expectation" commonutil "github.com/kubeflow/common/pkg/util" logger "github.com/kubeflow/common/pkg/util" + kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" + trainingoperatorcommon "github.com/kubeflow/training-operator/pkg/common" + "github.com/kubeflow/training-operator/pkg/common/util" + + "github.com/go-logr/logr" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -49,12 +53,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + schedulerpluginsv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" "volcano.sh/apis/pkg/apis/scheduling/v1beta1" - volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" - - kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" - trainingoperatorcommon "github.com/kubeflow/training-operator/pkg/common" - "github.com/kubeflow/training-operator/pkg/common/util" ) const ( @@ -76,7 +76,7 @@ const ( ) // NewReconciler creates a XGBoostJob Reconciler -func NewReconciler(mgr manager.Manager, scheduling bool) *XGBoostJobReconciler { +func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc) *XGBoostJobReconciler { r := &XGBoostJobReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), @@ -88,25 +88,24 @@ func NewReconciler(mgr manager.Manager, scheduling bool) *XGBoostJobReconciler { // Create clients cfg := mgr.GetConfig() kubeClientSet := kubeclientset.NewForConfigOrDie(cfg) - volcanoClientSet := volcanoclient.NewForConfigOrDie(cfg) sharedInformers := informers.NewSharedInformerFactory(kubeClientSet, 0) - priorityClassInformer := sharedInformers.Scheduling().V1beta1().PriorityClasses() + priorityClassInformer := sharedInformers.Scheduling().V1().PriorityClasses() // Initialize common job controller r.JobController = common.JobController{ Controller: r, Expectations: expectation.NewControllerExpectations(), - Config: common.JobControllerConfiguration{EnableGangScheduling: false}, WorkQueue: &util.FakeWorkQueue{}, Recorder: r.recorder, KubeClientSet: kubeClientSet, - VolcanoClientSet: volcanoClientSet, PriorityClassLister: priorityClassInformer.Lister(), PriorityClassInformerSynced: priorityClassInformer.Informer().HasSynced, PodControl: control.RealPodControl{KubeClient: kubeClientSet, Recorder: r.recorder}, ServiceControl: control.RealServiceControl{KubeClient: kubeClientSet, Recorder: r.recorder}, } + gangSchedulingSetupFunc(&r.JobController) + return r } @@ -223,11 +222,12 @@ func (r *XGBoostJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThre }); err != nil { return err } - // skip watching podgroup if podgroup is not installed + + // skip watching volcano PodGroup if volcano PodGroup is not installed _, err = mgr.GetRESTMapper().RESTMapping(schema.GroupKind{Group: v1beta1.SchemeGroupVersion.Group, Kind: "PodGroup"}, v1beta1.SchemeGroupVersion.Version) if err == nil { - // inject watching for job related podgroup + // inject watching for job related volcano PodGroup if err = c.Watch(&source.Kind{Type: &v1beta1.PodGroup{}}, &handler.EnqueueRequestForOwner{ IsController: true, OwnerType: &kubeflowv1.XGBoostJob{}, @@ -240,6 +240,24 @@ func (r *XGBoostJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThre } } + // skip watching scheduler-plugins PodGroup if scheduler-plugins PodGroup is not installed + _, err = mgr.GetRESTMapper().RESTMapping( + schema.GroupKind{Group: schedulerpluginsv1alpha1.SchemeGroupVersion.Group, Kind: "PodGroup"}, + schedulerpluginsv1alpha1.SchemeGroupVersion.Version) + if err == nil { + // inject watching for job related scheduler-plugins PodGroup + if err = c.Watch(&source.Kind{Type: &schedulerpluginsv1alpha1.PodGroup{}}, &handler.EnqueueRequestForOwner{ + IsController: true, + OwnerType: &kubeflowv1.XGBoostJob{}, + }, predicate.Funcs{ + CreateFunc: util.OnDependentCreateFuncGeneric(r.Expectations), + UpdateFunc: util.OnDependentUpdateFuncGeneric(&r.JobController), + DeleteFunc: util.OnDependentDeleteFuncGeneric(r.Expectations), + }); err != nil { + return err + } + } + return nil } diff --git a/sdk/python/docs/V1ReplicaStatus.md b/sdk/python/docs/V1ReplicaStatus.md index 74f8a8841a..2a0d3a3b8f 100644 --- a/sdk/python/docs/V1ReplicaStatus.md +++ b/sdk/python/docs/V1ReplicaStatus.md @@ -6,7 +6,8 @@ Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- **active** | **int** | The number of actively running pods. | [optional] **failed** | **int** | The number of pods which reached phase Failed. | [optional] -**label_selector** | **str** | A label selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty label selector matches all objects. A null label selector matches no objects. | [optional] +**label_selector** | [**V1LabelSelector**](V1LabelSelector.md) | | [optional] +**selector** | **str** | A Selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty Selector matches all objects. A null Selector matches no objects. | [optional] **succeeded** | **int** | The number of pods which reached phase Succeeded. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk/python/docs/V1SchedulingPolicy.md b/sdk/python/docs/V1SchedulingPolicy.md index 0832b1de97..5dfb9a4c65 100644 --- a/sdk/python/docs/V1SchedulingPolicy.md +++ b/sdk/python/docs/V1SchedulingPolicy.md @@ -8,6 +8,7 @@ Name | Type | Description | Notes **min_resources** | [**dict(str, Quantity)**](Quantity.md) | | [optional] **priority_class** | **str** | | [optional] **queue** | **str** | | [optional] +**schedule_timeout_seconds** | **int** | | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk/python/kubeflow/training/models/v1_replica_status.py b/sdk/python/kubeflow/training/models/v1_replica_status.py index df017ee90c..3e2e7d9585 100644 --- a/sdk/python/kubeflow/training/models/v1_replica_status.py +++ b/sdk/python/kubeflow/training/models/v1_replica_status.py @@ -35,7 +35,8 @@ class V1ReplicaStatus(object): openapi_types = { 'active': 'int', 'failed': 'int', - 'label_selector': 'str', + 'label_selector': 'V1LabelSelector', + 'selector': 'str', 'succeeded': 'int' } @@ -43,10 +44,11 @@ class V1ReplicaStatus(object): 'active': 'active', 'failed': 'failed', 'label_selector': 'labelSelector', + 'selector': 'selector', 'succeeded': 'succeeded' } - def __init__(self, active=None, failed=None, label_selector=None, succeeded=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, active=None, failed=None, label_selector=None, selector=None, succeeded=None, local_vars_configuration=None): # noqa: E501 """V1ReplicaStatus - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() @@ -55,6 +57,7 @@ def __init__(self, active=None, failed=None, label_selector=None, succeeded=None self._active = None self._failed = None self._label_selector = None + self._selector = None self._succeeded = None self.discriminator = None @@ -64,6 +67,8 @@ def __init__(self, active=None, failed=None, label_selector=None, succeeded=None self.failed = failed if label_selector is not None: self.label_selector = label_selector + if selector is not None: + self.selector = selector if succeeded is not None: self.succeeded = succeeded @@ -117,10 +122,9 @@ def failed(self, failed): def label_selector(self): """Gets the label_selector of this V1ReplicaStatus. # noqa: E501 - A label selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty label selector matches all objects. A null label selector matches no objects. # noqa: E501 :return: The label_selector of this V1ReplicaStatus. # noqa: E501 - :rtype: str + :rtype: V1LabelSelector """ return self._label_selector @@ -128,14 +132,36 @@ def label_selector(self): def label_selector(self, label_selector): """Sets the label_selector of this V1ReplicaStatus. - A label selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty label selector matches all objects. A null label selector matches no objects. # noqa: E501 :param label_selector: The label_selector of this V1ReplicaStatus. # noqa: E501 - :type: str + :type: V1LabelSelector """ self._label_selector = label_selector + @property + def selector(self): + """Gets the selector of this V1ReplicaStatus. # noqa: E501 + + A Selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty Selector matches all objects. A null Selector matches no objects. # noqa: E501 + + :return: The selector of this V1ReplicaStatus. # noqa: E501 + :rtype: str + """ + return self._selector + + @selector.setter + def selector(self, selector): + """Sets the selector of this V1ReplicaStatus. + + A Selector is a label query over a set of resources. The result of matchLabels and matchExpressions are ANDed. An empty Selector matches all objects. A null Selector matches no objects. # noqa: E501 + + :param selector: The selector of this V1ReplicaStatus. # noqa: E501 + :type: str + """ + + self._selector = selector + @property def succeeded(self): """Gets the succeeded of this V1ReplicaStatus. # noqa: E501 diff --git a/sdk/python/kubeflow/training/models/v1_scheduling_policy.py b/sdk/python/kubeflow/training/models/v1_scheduling_policy.py index 53a062c131..775b341204 100644 --- a/sdk/python/kubeflow/training/models/v1_scheduling_policy.py +++ b/sdk/python/kubeflow/training/models/v1_scheduling_policy.py @@ -36,17 +36,19 @@ class V1SchedulingPolicy(object): 'min_available': 'int', 'min_resources': 'dict(str, Quantity)', 'priority_class': 'str', - 'queue': 'str' + 'queue': 'str', + 'schedule_timeout_seconds': 'int' } attribute_map = { 'min_available': 'minAvailable', 'min_resources': 'minResources', 'priority_class': 'priorityClass', - 'queue': 'queue' + 'queue': 'queue', + 'schedule_timeout_seconds': 'scheduleTimeoutSeconds' } - def __init__(self, min_available=None, min_resources=None, priority_class=None, queue=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, min_available=None, min_resources=None, priority_class=None, queue=None, schedule_timeout_seconds=None, local_vars_configuration=None): # noqa: E501 """V1SchedulingPolicy - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() @@ -56,6 +58,7 @@ def __init__(self, min_available=None, min_resources=None, priority_class=None, self._min_resources = None self._priority_class = None self._queue = None + self._schedule_timeout_seconds = None self.discriminator = None if min_available is not None: @@ -66,6 +69,8 @@ def __init__(self, min_available=None, min_resources=None, priority_class=None, self.priority_class = priority_class if queue is not None: self.queue = queue + if schedule_timeout_seconds is not None: + self.schedule_timeout_seconds = schedule_timeout_seconds @property def min_available(self): @@ -151,6 +156,27 @@ def queue(self, queue): self._queue = queue + @property + def schedule_timeout_seconds(self): + """Gets the schedule_timeout_seconds of this V1SchedulingPolicy. # noqa: E501 + + + :return: The schedule_timeout_seconds of this V1SchedulingPolicy. # noqa: E501 + :rtype: int + """ + return self._schedule_timeout_seconds + + @schedule_timeout_seconds.setter + def schedule_timeout_seconds(self, schedule_timeout_seconds): + """Sets the schedule_timeout_seconds of this V1SchedulingPolicy. + + + :param schedule_timeout_seconds: The schedule_timeout_seconds of this V1SchedulingPolicy. # noqa: E501 + :type: int + """ + + self._schedule_timeout_seconds = schedule_timeout_seconds + def to_dict(self): """Returns the model properties as a dict""" result = {} diff --git a/sdk/python/test/test_kubeflow_org_v1_mpi_job.py b/sdk/python/test/test_kubeflow_org_v1_mpi_job.py index a39be0b4ab..5bb33e150c 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mpi_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_mpi_job.py @@ -58,7 +58,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), slots_per_worker = 56, ), status = V1JobStatus( @@ -77,7 +78,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ) diff --git a/sdk/python/test/test_kubeflow_org_v1_mpi_job_list.py b/sdk/python/test/test_kubeflow_org_v1_mpi_job_list.py index 67ee2d394a..bcc1bad6b9 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mpi_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_mpi_job_list.py @@ -61,7 +61,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), slots_per_worker = 56, ), status = V1JobStatus( @@ -80,7 +81,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ), ) @@ -114,7 +116,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), slots_per_worker = 56, ), status = V1JobStatus( @@ -133,7 +136,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ), ) diff --git a/sdk/python/test/test_kubeflow_org_v1_mpi_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_mpi_job_spec.py index 4e9671282e..fca79722e5 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mpi_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_mpi_job_spec.py @@ -54,7 +54,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), slots_per_worker = 56 ) diff --git a/sdk/python/test/test_kubeflow_org_v1_mx_job.py b/sdk/python/test/test_kubeflow_org_v1_mx_job.py index 1fc4a1341c..2f54224baa 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mx_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_mx_job.py @@ -57,7 +57,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), ), status = V1JobStatus( completion_time = None, @@ -75,7 +76,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ) diff --git a/sdk/python/test/test_kubeflow_org_v1_mx_job_list.py b/sdk/python/test/test_kubeflow_org_v1_mx_job_list.py index c072aad162..831f2c1c35 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mx_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_mx_job_list.py @@ -60,7 +60,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), ), status = V1JobStatus( completion_time = None, @@ -78,7 +79,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ), ) @@ -111,7 +113,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), ), status = V1JobStatus( completion_time = None, @@ -129,7 +132,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ), ) diff --git a/sdk/python/test/test_kubeflow_org_v1_mx_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_mx_job_spec.py index 8ba6a06b30..83cafecc76 100644 --- a/sdk/python/test/test_kubeflow_org_v1_mx_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_mx_job_spec.py @@ -53,7 +53,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ) ) else : @@ -75,7 +76,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), ) diff --git a/sdk/python/test/test_kubeflow_org_v1_paddle_job.py b/sdk/python/test/test_kubeflow_org_v1_paddle_job.py index 7d8b5f6228..1cd71a4372 100644 --- a/sdk/python/test/test_kubeflow_org_v1_paddle_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_paddle_job.py @@ -63,7 +63,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), ), status = V1JobStatus( completion_time = None, @@ -81,7 +82,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ) diff --git a/sdk/python/test/test_kubeflow_org_v1_paddle_job_list.py b/sdk/python/test/test_kubeflow_org_v1_paddle_job_list.py index 8801d63285..2de315603a 100644 --- a/sdk/python/test/test_kubeflow_org_v1_paddle_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_paddle_job_list.py @@ -66,7 +66,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), ), status = V1JobStatus( completion_time = None, @@ -84,7 +85,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ), ) @@ -123,7 +125,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), ), status = V1JobStatus( completion_time = None, @@ -141,7 +144,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ), ) diff --git a/sdk/python/test/test_kubeflow_org_v1_paddle_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_paddle_job_spec.py index b67bd0bdcf..1e9ebc2e3e 100644 --- a/sdk/python/test/test_kubeflow_org_v1_paddle_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_paddle_job_spec.py @@ -59,7 +59,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ) ) else : @@ -80,7 +81,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), ) diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py index 64c658922f..e6fe89f4b1 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job.py @@ -74,7 +74,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), ), status = V1JobStatus( completion_time = None, @@ -92,7 +93,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ) diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py index d7182a7071..c860de7ecb 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_list.py @@ -77,7 +77,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), ), status = V1JobStatus( completion_time = None, @@ -95,7 +96,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ), ) @@ -145,7 +147,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), ), status = V1JobStatus( completion_time = None, @@ -163,7 +166,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ), ) diff --git a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py index d65bb039be..65de87eb14 100644 --- a/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_py_torch_job_spec.py @@ -70,7 +70,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ) ) else : @@ -91,7 +92,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), ) diff --git a/sdk/python/test/test_kubeflow_org_v1_tf_job.py b/sdk/python/test/test_kubeflow_org_v1_tf_job.py index e114fccddb..ce1193272d 100644 --- a/sdk/python/test/test_kubeflow_org_v1_tf_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_tf_job.py @@ -51,7 +51,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), success_policy = '0', tf_replica_specs = { @@ -76,7 +77,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ) diff --git a/sdk/python/test/test_kubeflow_org_v1_tf_job_list.py b/sdk/python/test/test_kubeflow_org_v1_tf_job_list.py index 617eef2137..e513a728e3 100644 --- a/sdk/python/test/test_kubeflow_org_v1_tf_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_tf_job_list.py @@ -54,7 +54,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), success_policy = '0', tf_replica_specs = { @@ -79,7 +80,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ), ) @@ -106,7 +108,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), success_policy = '0', tf_replica_specs = { @@ -131,7 +134,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ), ) diff --git a/sdk/python/test/test_kubeflow_org_v1_tf_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_tf_job_spec.py index 278f17b975..fc34a75a2f 100644 --- a/sdk/python/test/test_kubeflow_org_v1_tf_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_tf_job_spec.py @@ -47,7 +47,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), success_policy = '0', tf_replica_specs = { @@ -69,7 +70,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), tf_replica_specs = { 'key' : V1ReplicaSpec( diff --git a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job.py b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job.py index 3d349b7873..f4a8234eb2 100644 --- a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job.py +++ b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job.py @@ -50,7 +50,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), xgb_replica_specs = { 'key' : V1ReplicaSpec( @@ -74,7 +75,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ) diff --git a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_list.py b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_list.py index 1f484d271c..d2801fd4f2 100644 --- a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_list.py +++ b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_list.py @@ -53,7 +53,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), xgb_replica_specs = { 'key' : V1ReplicaSpec( @@ -77,7 +78,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ), ) @@ -103,7 +105,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), xgb_replica_specs = { 'key' : V1ReplicaSpec( @@ -127,7 +130,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None, ), ) diff --git a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_spec.py b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_spec.py index e2a694d53a..dab6abe9ef 100644 --- a/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_spec.py +++ b/sdk/python/test/test_kubeflow_org_v1_xg_boost_job_spec.py @@ -46,7 +46,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), xgb_replica_specs = { 'key' : V1ReplicaSpec( @@ -67,7 +68,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56, ), xgb_replica_specs = { 'key' : V1ReplicaSpec( diff --git a/sdk/python/test/test_v1_job_status.py b/sdk/python/test/test_v1_job_status.py index 12350b6f9a..1306a33fa3 100644 --- a/sdk/python/test/test_v1_job_status.py +++ b/sdk/python/test/test_v1_job_status.py @@ -51,7 +51,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, start_time = None @@ -71,7 +72,8 @@ def make_instance(self, include_optional): 'key' : V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56, ) }, ) diff --git a/sdk/python/test/test_v1_replica_status.py b/sdk/python/test/test_v1_replica_status.py index 17402fa046..e1af62a909 100644 --- a/sdk/python/test/test_v1_replica_status.py +++ b/sdk/python/test/test_v1_replica_status.py @@ -38,7 +38,8 @@ def make_instance(self, include_optional): return V1ReplicaStatus( active = 56, failed = 56, - label_selector = '0', + label_selector = None, + selector = '0', succeeded = 56 ) else : diff --git a/sdk/python/test/test_v1_run_policy.py b/sdk/python/test/test_v1_run_policy.py index 47590e23e0..ca99ac2334 100644 --- a/sdk/python/test/test_v1_run_policy.py +++ b/sdk/python/test/test_v1_run_policy.py @@ -45,7 +45,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0', ), + queue = '0', + schedule_timeout_seconds = 56, ), ttl_seconds_after_finished = 56 ) else : diff --git a/sdk/python/test/test_v1_scheduling_policy.py b/sdk/python/test/test_v1_scheduling_policy.py index 409ee52b14..5f5c515eba 100644 --- a/sdk/python/test/test_v1_scheduling_policy.py +++ b/sdk/python/test/test_v1_scheduling_policy.py @@ -41,7 +41,8 @@ def make_instance(self, include_optional): 'key' : None }, priority_class = '0', - queue = '0' + queue = '0', + schedule_timeout_seconds = 56 ) else : return V1SchedulingPolicy(