diff --git a/README.md b/README.md index b016456..12eab68 100644 --- a/README.md +++ b/README.md @@ -239,6 +239,7 @@ SELECT * from reservations; - we need to use shrink or partial cancel here. And a shrink down to size 0 I assume is a cancel. - [ ] For cancel, we would issue a cancel for every pod associated with a job. How can we avoid that (or is that OK?) - [ ] we will eventually need another mechanism to move schedule queue aside from new submission +- What if instead of loop we do reservation every N jobs? Then we wouldn't need a loop? - [ ] scheduleAt can be used to AskFlux in the future - [ ] Nodes that are currently assigned need to be taken into account - Right now they aren't included in resources, but instead should be "given" to Fluxion. diff --git a/api/v1alpha1/fluxjob_types.go b/api/v1alpha1/fluxjob_types.go index a6d8468..36b8af3 100644 --- a/api/v1alpha1/fluxjob_types.go +++ b/api/v1alpha1/fluxjob_types.go @@ -104,9 +104,13 @@ type FluxJobSpec struct { // +optional Reservation bool `json:"reservation,omitempty"` - // Nodes needed for the job + // Slots needed for the job // +optional - Nodes int32 `json:"nodes"` + Slots int32 `json:"nodes"` + + // Cores per pod (slot) + // +optional + Cores int32 `json:"cores"` // Resources assigned // +optional diff --git a/api/v1alpha1/submit.go b/api/v1alpha1/submit.go index 6fde320..3125b95 100644 --- a/api/v1alpha1/submit.go +++ b/api/v1alpha1/submit.go @@ -2,16 +2,13 @@ package v1alpha1 import ( "context" - "fmt" - jobspec "github.com/compspec/jobspec-go/pkg/jobspec/v1" jspec "github.com/converged-computing/fluxqueue/pkg/jobspec" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -21,13 +18,14 @@ var ( ) // SubmitFluxJob wraps a pod or job spec into a FluxJob -// We essentially create a CRD for a a FluxJob +// We essentially create a CRD for a a FluxJob. Note that we are asking +// for SLOTS and not nodes - a slot can be a part of a node. func SubmitFluxJob( ctx context.Context, jobType JobWrapped, name string, namespace string, - nodes int32, + slots int32, containers []corev1.Container, ) error { @@ -51,14 +49,14 @@ func SubmitFluxJob( slog.Error(err, "Issue with getting job", "Namespace", namespace, "Name", jobName) return err } - resources := jspec.GeneratePodResources(containers) + resources := jspec.GeneratePodResources(containers, slots) // Artificially create a command for the name and namespace - command := fmt.Sprintf("echo %s %s", namespace, name) + command := []string{"echo", namespace, name} // Generate a jobspec for that many nodes (starting simple) // TODO will need to add GPU and memory here... if Flux supports memory - js, err := jobspec.NewSimpleJobspec(name, command, nodes, resources.Cpu) + js, err := jspec.NewJobspec(name, command, resources) if err != nil { slog.Error(err, "Issue with creating job", "Namespace", namespace, "Name", jobName) return err @@ -80,9 +78,10 @@ func SubmitFluxJob( ObjectMeta: metav1.ObjectMeta{Name: jobName, Namespace: namespace}, Spec: FluxJobSpec{ JobSpec: jsString, - Nodes: nodes, + Slots: slots, Type: jobType, Name: name, + Cores: resources.Slot.Cpu, }, Status: FluxJobStatus{ SubmitStatus: SubmitStatusNew, diff --git a/build/postgres/create-tables.sql b/build/postgres/create-tables.sql index 180cb4d..d4bd4e0 100644 --- a/build/postgres/create-tables.sql +++ b/build/postgres/create-tables.sql @@ -8,7 +8,8 @@ CREATE TABLE pending_queue ( reservation INTEGER NOT NULL, duration INTEGER NOT NULL, created_at timestamptz NOT NULL default NOW(), - size INTEGER NOT NULL + size INTEGER NOT NULL, + cores INTEGER NOT NULL ); CREATE UNIQUE INDEX pending_index ON pending_queue (name, namespace); diff --git a/chart/templates/fluxjob-crd.yaml b/chart/templates/fluxjob-crd.yaml index 7da0897..45a56c5 100644 --- a/chart/templates/fluxjob-crd.yaml +++ b/chart/templates/fluxjob-crd.yaml @@ -65,6 +65,10 @@ spec: A FluxJob is a mapping of a Kubernetes abstraction (e.g., job) into a Flux JobSpec, one that Fluxion can digest. properties: + cores: + description: Cores per pod (slot) + format: int32 + type: integer duration: description: Duration is the maximum runtime of the job format: int32 @@ -76,7 +80,7 @@ spec: description: Original name of the job type: string nodes: - description: Nodes needed for the job + description: Slots needed for the job format: int32 type: integer object: diff --git a/config/crd/bases/jobs.converged-computing.org_fluxjobs.yaml b/config/crd/bases/jobs.converged-computing.org_fluxjobs.yaml index 4ee48dc..190d832 100644 --- a/config/crd/bases/jobs.converged-computing.org_fluxjobs.yaml +++ b/config/crd/bases/jobs.converged-computing.org_fluxjobs.yaml @@ -52,6 +52,10 @@ spec: A FluxJob is a mapping of a Kubernetes abstraction (e.g., job) into a Flux JobSpec, one that Fluxion can digest. properties: + cores: + description: Cores per pod (slot) + format: int32 + type: integer duration: description: Duration is the maximum runtime of the job format: int32 @@ -63,7 +67,7 @@ spec: description: Original name of the job type: string nodes: - description: Nodes needed for the job + description: Slots needed for the job format: int32 type: integer object: diff --git a/go.mod b/go.mod index a64e7a1..5f21e20 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,6 @@ module github.com/converged-computing/fluxqueue go 1.22.0 require ( - github.com/compspec/jobspec-go v0.0.0-20240510054255-ee02cdc7d3d4 - github.com/converged-computing/fluxion v0.0.0-20250105140137-04388a62d0fa - github.com/jackc/pgx v3.6.2+incompatible github.com/jackc/pgx/v5 v5.7.2 github.com/onsi/ginkgo/v2 v2.19.0 github.com/onsi/gomega v1.33.1 @@ -14,14 +11,18 @@ require ( github.com/riverqueue/river/rivershared v0.15.0 google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 + gopkg.in/yaml.v2 v2.4.0 k8s.io/api v0.31.2 k8s.io/apimachinery v0.31.2 k8s.io/client-go v0.31.2 - k8s.io/klog v1.0.0 - k8s.io/klog/v2 v2.130.1 sigs.k8s.io/controller-runtime v0.19.3 ) +require ( + github.com/compspec/jobspec-go v0.0.0-20250130030627-58df7d7ed642 // indirect + github.com/converged-computing/fluxion v0.0.0-20250130025038-615e35a80230 // indirect +) + require ( github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect @@ -30,7 +31,7 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/emicklei/go-restful/v3 v3.11.0 // indirect + github.com/emicklei/go-restful/v3 v3.11.0 // jo github.com/evanphx/json-patch/v5 v5.9.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect @@ -106,11 +107,11 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.31.2 // indirect k8s.io/apiserver v0.31.2 // indirect k8s.io/component-base v0.31.2 // indirect + k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 // indirect diff --git a/go.sum b/go.sum index f13573e..91dd108 100644 --- a/go.sum +++ b/go.sum @@ -10,10 +10,10 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/compspec/jobspec-go v0.0.0-20240510054255-ee02cdc7d3d4 h1:4MaTp3OcUmp6HFEojeI//GthUt7GMYnB8K5OSZdKxZA= -github.com/compspec/jobspec-go v0.0.0-20240510054255-ee02cdc7d3d4/go.mod h1:BaJyxaOhESe2DD4lqBdwTEWOw0TaTZVJGPrFh6KyXQM= -github.com/converged-computing/fluxion v0.0.0-20250105140137-04388a62d0fa h1:F5/pXXI5F0jC4XG/nAgN65gBS7+2SYKlwdzkVzawPdI= -github.com/converged-computing/fluxion v0.0.0-20250105140137-04388a62d0fa/go.mod h1:tNlvJY1yFWpp/QqdqJnq8YMGYG99K6YMxDmpu9IVS1E= +github.com/compspec/jobspec-go v0.0.0-20250130030627-58df7d7ed642 h1:kLwazFe8Cl7ZUuF7LidS91IwBjPpcWVVZKEN2VOq6g8= +github.com/compspec/jobspec-go v0.0.0-20250130030627-58df7d7ed642/go.mod h1:BaJyxaOhESe2DD4lqBdwTEWOw0TaTZVJGPrFh6KyXQM= +github.com/converged-computing/fluxion v0.0.0-20250130025038-615e35a80230 h1:Nzr3Jywwinpdo2Tt13RQPiKiMJhwAbxkbmancpPMCTM= +github.com/converged-computing/fluxion v0.0.0-20250130025038-615e35a80230/go.mod h1:tNlvJY1yFWpp/QqdqJnq8YMGYG99K6YMxDmpu9IVS1E= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -32,7 +32,6 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= -github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -81,8 +80,6 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o= -github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI= github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= @@ -280,8 +277,6 @@ k8s.io/client-go v0.31.2 h1:Y2F4dxU5d3AQj+ybwSMqQnpZH9F30//1ObxOKlTI9yc= k8s.io/client-go v0.31.2/go.mod h1:NPa74jSVR/+eez2dFsEIHNa+3o09vtNaWwWwb1qSxSs= k8s.io/component-base v0.31.2 h1:Z1J1LIaC0AV+nzcPRFqfK09af6bZ4D1nAOpWsy9owlA= k8s.io/component-base v0.31.2/go.mod h1:9PeyyFN/drHjtJZMCTkSpQJS3U9OXORnHQqMLDz0sUQ= -k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= -k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= diff --git a/hack/quick-build-kind.sh b/hack/quick-build-kind.sh index 12fe2e4..f3acbdd 100755 --- a/hack/quick-build-kind.sh +++ b/hack/quick-build-kind.sh @@ -28,7 +28,7 @@ helm install \ --set scheduler.image=${REGISTRY}/fluxqueue-scheduler:latest \ --set postgres.image=${REGISTRY}/fluxqueue-postgres:latest \ --set controllerManager.manager.imagePullPolicy=Never \ - --set controllerManager.fluxion.image.tag=grow-api \ + --set controllerManager.fluxion.image.tag=satisfy \ --namespace ${NAMESPACE} \ --create-namespace \ --set scheduler.pullPolicy=Never \ diff --git a/hack/quick-deploy-kind.sh b/hack/quick-deploy-kind.sh index c7b57fc..c6ca9df 100755 --- a/hack/quick-deploy-kind.sh +++ b/hack/quick-deploy-kind.sh @@ -25,7 +25,7 @@ helm install \ --set scheduler.image=${REGISTRY}/fluxqueue-scheduler:latest \ --set postgres.image=${REGISTRY}/fluxqueue-postgres:latest \ --set controllerManager.manager.imagePullPolicy=Never \ - --set controllerManager.fluxion.image.tag=grow-api \ + --set controllerManager.fluxion.image.tag=satisfy \ --namespace ${NAMESPACE} \ --create-namespace \ --set scheduler.pullPolicy=Never \ diff --git a/pkg/fluxqueue/fluxqueue.go b/pkg/fluxqueue/fluxqueue.go index 819154a..f685a7d 100644 --- a/pkg/fluxqueue/fluxqueue.go +++ b/pkg/fluxqueue/fluxqueue.go @@ -192,7 +192,8 @@ func (q *Queue) Enqueue(spec *api.FluxJob) (types.EnqueueStatus, error) { spec.Spec.Type, reservation, spec.Spec.Duration, - spec.Spec.Nodes, + spec.Spec.Slots, + spec.Spec.Cores, ) // If unknown, we won't give status submit, and it should requeue to try again diff --git a/pkg/fluxqueue/queries/queries.go b/pkg/fluxqueue/queries/queries.go index 34bda74..6c4fa7b 100644 --- a/pkg/fluxqueue/queries/queries.go +++ b/pkg/fluxqueue/queries/queries.go @@ -8,7 +8,7 @@ const ( IsPendingQuery = "select * from pending_queue where name = $1 and namespace = $2;" // Insert into pending queue (assumes after above query, we've checked it does not exist) - InsertIntoPending = "insert into pending_queue (jobspec, flux_job_name, namespace, name, type, reservation, duration, size) values ($1, $2, $3, $4, $5, $6, $7, $8);" + InsertIntoPending = "insert into pending_queue (jobspec, flux_job_name, namespace, name, type, reservation, duration, size, cores) values ($1, $2, $3, $4, $5, $6, $7, $8, $9);" // TODO add back created_at // We remove from pending to allow another group submission of the same name on cleanup @@ -16,7 +16,7 @@ const ( // Easy Queries to get jobs // Select jobs based on creation timestamp - SelectPendingByCreation = "select jobspec, name, flux_job_name, namespace, type, reservation, duration, size from pending_queue order by created_at desc;" + SelectPendingByCreation = "select jobspec, name, flux_job_name, namespace, type, reservation, duration, size, cores from pending_queue order by created_at desc;" // Reservations AddReservationQuery = "insert into reservations (name, flux_id) values ($1, $2);" diff --git a/pkg/fluxqueue/strategy/easy.go b/pkg/fluxqueue/strategy/easy.go index 5dce6f7..dd1c4f6 100644 --- a/pkg/fluxqueue/strategy/easy.go +++ b/pkg/fluxqueue/strategy/easy.go @@ -199,6 +199,7 @@ func (s EasyBackfill) ReadyJobs(ctx context.Context, pool *pgxpool.Pool) ([]work Reservation: model.Reservation, Size: model.Size, Duration: model.Duration, + Cores: model.Cores, } jobs = append(jobs, jobArgs) } diff --git a/pkg/fluxqueue/strategy/workers/job.go b/pkg/fluxqueue/strategy/workers/job.go index 3459a28..2e80399 100644 --- a/pkg/fluxqueue/strategy/workers/job.go +++ b/pkg/fluxqueue/strategy/workers/job.go @@ -2,7 +2,6 @@ package workers import ( "context" - "encoding/json" "fmt" "os" "strings" @@ -16,7 +15,7 @@ import ( api "github.com/converged-computing/fluxqueue/api/v1alpha1" "github.com/converged-computing/fluxqueue/pkg/defaults" "github.com/converged-computing/fluxqueue/pkg/fluxqueue/queries" - "github.com/converged-computing/fluxqueue/pkg/types" + jgf "github.com/converged-computing/fluxqueue/pkg/jgf" "github.com/riverqueue/river" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" patchTypes "k8s.io/apimachinery/pkg/types" @@ -54,6 +53,10 @@ type JobArgs struct { FluxJobName string `json:"flux_job_name"` Type string `json:"type"` + // This is the number of cores per pod + // We use this to calculate / create a final node list + Cores int32 `json:"cores"` + // If true, we are allowed to ask Fluxion for a reservation Reservation int32 `json:"reservation"` Duration int32 `json:"duration"` @@ -66,16 +69,22 @@ type JobArgs struct { // Work performs the AskFlux action. Any error returned that is due to not having resources means // the job will remain in the worker queue to AskFluxion again. func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { - wlog.Info("Asking Fluxion to schedule job", "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Nodes", job.Args.Size) + wlog.Info("Asking Fluxion to schedule job", + "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Nodes", job.Args.Size) + + fmt.Println(job.Args.Jobspec) // Let's ask Flux if we can allocate nodes for the job! fluxionCtx, cancel := context.WithTimeout(context.Background(), 200*time.Second) defer cancel() // Prepare the request to allocate - convert string to bytes + // This Jobspec includes all slots (pods) so we get an allocation that considers that + // We assume reservation allows for the satisfy to be in the future request := &pb.MatchRequest{Jobspec: job.Args.Jobspec, Reservation: job.Args.Reservation == 1} // This is the host where fluxion is running, will be localhost 4242 for sidecar + // TODO try again to put this client on the class so we don't connect each time fluxion, err := client.NewClient("127.0.0.1:4242") if err != nil { wlog.Error(err, "Fluxion error connecting to server") @@ -83,14 +92,14 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { } defer fluxion.Close() - // An error here is an error with making the request, nothing about - // the match/allocation itself. + // An error here is an error with making the request, nothing about the allocation response, err := fluxion.Match(fluxionCtx, request) if err != nil { - wlog.Info("[WORK] Fluxion did not receive any match response", "Error", err) + wlog.Info("[WORK] Fluxion did not receive any satisfy response", "Error", err) return err } + // For each node assignment, we make an exact job with that request // If we asked for a reservation, and it wasn't reserved AND not allocated, this means it's not possible // We currently don't have grow/shrink added so this means it will never be possible. // We will unsuspend the job but add a label that indicates it is not schedulable. @@ -116,7 +125,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { } // Now get the nodes. These are actually cores assigned to nodes, so we need to keep count - nodes, err := parseNodes(response.Allocation) + nodes, cancelResponses, err := parseNodes(response.Allocation, job.Args.Cores) if err != nil { wlog.Info("Error parsing nodes from fluxion response", "Namespace", job.Args.Namespace, "Name", job.Args.Name, "Error", err) return err @@ -124,7 +133,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { wlog.Info("Fluxion allocation response", "Nodes", nodes) // Unsuspend the job or ungate the pods, adding the node assignments as labels for the scheduler - err = w.releaseJob(ctx, job.Args, fluxID, nodes) + err = w.releaseJob(ctx, job.Args, fluxID, nodes, cancelResponses) if err != nil { return err } @@ -133,7 +142,7 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { } // Release job will unsuspend a job or ungate pods to allow for scheduling -func (w JobWorker) releaseJob(ctx context.Context, args JobArgs, fluxID int64, nodes []string) error { +func (w JobWorker) releaseJob(ctx context.Context, args JobArgs, fluxID int64, nodes []string, cancelResponses []string) error { var err error if args.Type == api.JobWrappedJob.String() { @@ -197,33 +206,179 @@ func (w JobWorker) reserveJob(ctx context.Context, args JobArgs, fluxID int64) e // parseNodes parses the allocation nodes into a lookup with core counts // We will add these as labels onto each pod for the scheduler, or as one -func parseNodes(allocation string) ([]string, error) { +// This means that we get back some allocation graph with the slot defined at cores, +// so the group size will likely not coincide with the number of nodes. For +// this reason, we have to divide to place them. The final number should +// match the group size. +func parseNodes(allocation string, cores int32) ([]string, []string, error) { // We can eventually send over more metadata, for now just a list of nodes - nodesWithCores := map[string]int{} nodes := []string{} - // The response is the graph with assignments. Here we parse the graph into a struct to get nodes. - var graph types.AllocationResponse - err := json.Unmarshal([]byte(allocation), &graph) + // We also need to save a corresponding cancel request + cancelRequests := []string{} + + // Also try serailizing back into graph + g, err := jgf.LoadFluxJGF(allocation) if err != nil { - return nodes, err + return nodes, cancelRequests, err + } + fmt.Println(g) + + // For each pod, we will need to be able to do partial cancel. + // We can do this by saving the initial graph (without cores) + // and adding them on to the cancel request. We first need a lookup + // for the path between cluster->subnet->nodes->cores. + // This logic will need to be updated if we change the graph. + nodeLookup := map[string]jgf.Node{} + + // Store nodes based on paths + nodePaths := map[string]jgf.Node{} + edgeLookup := map[string][]jgf.Edge{} + + // Parse nodes first so we can match the containment path to the host + for _, node := range g.Graph.Nodes { + nodeLookup[node.Id] = node + nodePaths[node.Metadata.Paths["containment"]] = node + } + + // The edge lookup will allow us to add connected nodes + // We need to be able to map a node path to a list of edges + // The node path gets us the node id (source) + var addEdge = func(node *jgf.Node, edge *jgf.Edge) { + path := node.Metadata.Paths["containment"] + _, ok := edgeLookup[path] + if !ok { + edgeLookup[path] = []jgf.Edge{} + } + edgeLookup[path] = append(edgeLookup[path], *edge) + } + for _, edge := range g.Graph.Edges { + targetNode := nodeLookup[edge.Target] + sourceNode := nodeLookup[edge.Source] + addEdge(&targetNode, &edge) + addEdge(&sourceNode, &edge) } - // To start, just parse nodes and not cores (since we can't bind on level of core) - for _, node := range graph.Graph.Nodes { + // Parse nodes first so we can match the containment path to the host + lookup := map[string]string{} + for _, node := range g.Graph.Nodes { + nodePath := node.Metadata.Paths["containment"] + nodeLookup[fmt.Sprintf("%d", node.Metadata.Id)] = node if node.Metadata.Type == "node" { nodeId := node.Metadata.Basename - _, ok := nodesWithCores[nodeId] + lookup[nodePath] = nodeId + } + } + + // We also need to know the exact cores that are assigned to each node + coresByNode := map[string][]jgf.Node{} + + // We are going to first make a count of cores per node. We do this + // by parsing the containment path. It should always look like: + // "/cluster0/0/kind-worker1/core0 for a core + coreCounts := map[string]int32{} + for _, node := range g.Graph.Nodes { + path := node.Metadata.Paths["containment"] + + if node.Metadata.Type == "core" { + coreName := fmt.Sprintf("core%d", node.Metadata.Id) + nodePath := strings.TrimRight(path, "/"+coreName) + nodeId, ok := lookup[nodePath] + + // This shouldn't happen, but if it does, we should catch it if !ok { - nodesWithCores[nodeId] = 0 - nodes = append(nodes, nodeId) + return nodes, cancelRequests, fmt.Errorf("unknown node path %s", nodePath) + } + + // Update core counts for the node + _, ok = coreCounts[nodeId] + if !ok { + coreCounts[nodeId] = int32(0) + } + + // Each core is one + coreCounts[nodeId] += 1 + + // This is a list of cores (node) assigned to the physical node + // We do this based on ids so we can use the edge lookup + assignedCores, ok := coresByNode[nodePath] + if !ok { + assignedCores = []jgf.Node{} + } + assignedCores = append(assignedCores, node) + coresByNode[nodeId] = assignedCores + } + } + fmt.Printf("Distributing %d cores per pod into core counts ", cores) + fmt.Println(coreCounts) + + // Now we need to divide by the slot size (number of cores per pod) + // and add those nodes to a list (there will be repeats). For each slot + // (pod) we need to generate a JGF that includes resources for cancel. + for nodeId, totalCores := range coreCounts { + fmt.Printf("Node %s has %d cores across slots to fit %d core(s) per slot\n", nodeId, totalCores, cores) + numberSlots := totalCores / cores + for _ = range int32(numberSlots) { + + // Prepare a graph for a cancel response + graph := jgf.NewFluxJGF() + seenEdges := map[string]bool{} + coreNodes := coresByNode[nodeId] + + // addNewEdges to the graph Edges if we haven't yet + var addNewEdges = func(path string) { + addEdges, ok := edgeLookup[path] + if ok { + for _, addEdge := range addEdges { + edgeId := fmt.Sprintf("%s-%s", addEdge.Source, addEdge.Target) + _, alreadyAdded := seenEdges[edgeId] + if !alreadyAdded { + graph.Graph.Edges = append(graph.Graph.Edges, addEdge) + seenEdges[edgeId] = true + } + } + } + } + + // The cancel response needs only units from the graph associated + // with the specific cores assigned. + for _, coreNode := range coreNodes { + path := coreNode.Metadata.Paths["containment"] + _, ok := graph.NodeMap[path] + if !ok { + graph.NodeMap[path] = coreNode + graph.Graph.Nodes = append(graph.Graph.Nodes, coreNode) + addNewEdges(path) + } + // Parse the entire path and add nodes up root + parts := strings.Split(path, "/") + for idx := range len(parts) { + if idx == 0 { + continue + } + path := strings.Join(parts[0:idx], "/") + fmt.Println(path) + _, ok := graph.NodeMap[path] + if !ok { + graph.NodeMap[path] = nodePaths[path] + graph.Graph.Nodes = append(graph.Graph.Nodes, nodePaths[path]) + addNewEdges(path) + } + } + } + nodes = append(nodes, nodeId) + + // Serialize the cancel request to string + graphStr, err := graph.ToJson() + if err != nil { + return nodes, cancelRequests, err } - // Keep a record of cores assigned per node - nodesWithCores[nodeId] += 1 + cancelRequests = append(cancelRequests, graphStr) + fmt.Println(graphStr) } } - return nodes, nil + return nodes, cancelRequests, nil } // Unsuspend the job, adding an annotation for nodes along with the fluxion scheduler diff --git a/pkg/fluxqueue/types/types.go b/pkg/fluxqueue/types/types.go index c35a914..ce699d6 100644 --- a/pkg/fluxqueue/types/types.go +++ b/pkg/fluxqueue/types/types.go @@ -29,6 +29,7 @@ type JobModel struct { Reservation int32 `db:"reservation"` Duration int32 `db:"duration"` Size int32 `db:"size"` + Cores int32 `db:"cores"` } type ReservationModel struct { diff --git a/pkg/jgf/jgf.go b/pkg/jgf/jgf.go index 284c9d0..12ca953 100644 --- a/pkg/jgf/jgf.go +++ b/pkg/jgf/jgf.go @@ -51,6 +51,13 @@ func NewFluxJGF() FluxJGF { } } +// Load a graph payload into a JGF structure +func LoadFluxJGF(payload string) (FluxJGF, error) { + var graph FluxJGF + err := json.Unmarshal([]byte(payload), &graph) + return graph, err +} + // ToJson returns a Json string of the graph func (g *FluxJGF) ToJson() (string, error) { toprint, err := json.MarshalIndent(g.Graph, "", "\t") @@ -81,7 +88,7 @@ func (g *FluxJGF) MakeBidirectionalEdge(parent, child string) { // MakeEdge creates an edge for the JGF func (g *FluxJGF) MakeEdge(source string, target string, contains string) { - newedge := edge{ + newedge := Edge{ Source: source, Target: target, Metadata: edgeMetadata{Subsystem: containmentKey}, diff --git a/pkg/jgf/types.go b/pkg/jgf/types.go index ca7fe3d..f03cfca 100644 --- a/pkg/jgf/types.go +++ b/pkg/jgf/types.go @@ -8,7 +8,7 @@ type Node struct { Metadata nodeMetadata `json:"metadata,omitempty"` } -type edge struct { +type Edge struct { Source string `json:"source"` Relation string `json:"relation,omitempty"` Target string `json:"target"` @@ -36,7 +36,7 @@ type nodeMetadata struct { type graph struct { Nodes []Node `json:"nodes"` - Edges []edge `json:"edges"` + Edges []Edge `json:"edges"` // Metadata metadata `json:"metadata,omitempty"` Directed bool `json:"directed,omitempty"` } diff --git a/pkg/jobspec/jobspec.go b/pkg/jobspec/jobspec.go index 97fd03c..da3a025 100644 --- a/pkg/jobspec/jobspec.go +++ b/pkg/jobspec/jobspec.go @@ -1,45 +1,58 @@ package jobspec import ( - corev1 "k8s.io/api/core/v1" + v1 "github.com/compspec/jobspec-go/pkg/jobspec/v1" ) -// https://github.com/kubernetes/kubectl/blob/master/pkg/describe/describe.go#L4211-L4213 -type Resources struct { - Cpu int32 - Memory int64 - Gpu int64 - Storage int64 - Labels []string -} - -// GeneratePodResources returns resources for a pod, which can -// be used to populate the flux JobSpec -func GeneratePodResources(containers []corev1.Container) *Resources { - - // We will sum cpu and memory across containers - // For GPU, we could make a more complex jobspec, but for now - // assume one container is representative for GPU needed. - resources := Resources{} - - for _, container := range containers { +// NewJobSpec generates a jobspec for some number of slots in a cluster +// We associate each "slot" with a pod, so the request asks for a specific number of cpu. +// We also are assuming now that each pod is equivalent, so slots are equivalent. +// If we want to change this, we will need an ability to define slots of different types. +func NewJobspec(name string, command []string, resources *Resources) (*v1.Jobspec, error) { + + // This is creating the resources for the slot Cores are always set to minimally 1 + slotSpec := newSlotSpec(resources) + + // Create the top level resources spec (with a slot) + rSpec := []v1.Resource{ + { + Type: "slot", + Count: resources.Count, + Label: "default", + With: slotSpec, + }, + } - // Add on Cpu, Memory, GPU from container requests - // This is a limited set of resources owned by the pod - resources.Cpu += int32(container.Resources.Requests.Cpu().Value()) - resources.Memory += container.Resources.Requests.Memory().Value() - resources.Storage += container.Resources.Requests.StorageEphemeral().Value() + // Create the task spec + tasks := []v1.Tasks{ + { + Command: command, + Slot: "default", + Count: v1.Count{PerSlot: 1}, + }, + } - // We assume that a pod (node) only has access to the same GPU - gpus, ok := container.Resources.Limits["nvidia.com/gpu"] - if ok { - resources.Gpu += gpus.Value() - } + // Start preparing the spec + spec := v1.Jobspec{ + Version: 1, + Resources: rSpec, + Tasks: tasks, } + return &spec, nil +} - // If we have zero cpus, assume 1 - if resources.Cpu == 0 { - resources.Cpu = 1 +// newSlotSpec creates a spec for one slot, which is one pod (a set of containers) +func newSlotSpec(resources *Resources) []v1.Resource { + slotSpec := []v1.Resource{ + {Type: "core", Count: resources.Slot.Cpu}, + } + // If we have memory or gpu specified, they are appended + if resources.Slot.Gpu > 0 { + slotSpec = append(slotSpec, v1.Resource{Type: "gpu", Count: int32(resources.Slot.Gpu)}) + } + if resources.Slot.Memory > 0 { + toMB := resources.Slot.Memory >> 20 + slotSpec = append(slotSpec, v1.Resource{Type: "memory", Count: int32(toMB)}) } - return &resources + return slotSpec } diff --git a/pkg/jobspec/resources.go b/pkg/jobspec/resources.go new file mode 100644 index 0000000..fe1d671 --- /dev/null +++ b/pkg/jobspec/resources.go @@ -0,0 +1,54 @@ +package jobspec + +import ( + corev1 "k8s.io/api/core/v1" +) + +// https://github.com/kubernetes/kubectl/blob/master/pkg/describe/describe.go#L4211-L4213 +// We want each slot to coincide with one pod. We will ask flux for all N slots to schedule, +// and then based on the response we get back, assign specific nodes. This currently +// assumes slots are each the same, but this is subject to change. +// QUESTION: what to add here for labels? +type Resources struct { + Labels []string + Slot Slot + Count int32 +} + +type Slot struct { + Cpu int32 + Memory int64 + Gpu int64 + Storage int64 +} + +// GeneratePodResources returns resources for a pod, which can +// be used to populate the flux JobSpec. +func GeneratePodResources(containers []corev1.Container, slots int32) *Resources { + + // We will sum cpu and memory across containers + // For GPU, we could make a more complex jobspec, but for now + // assume one container is representative for GPU needed. + resources := Resources{Slot: Slot{}, Count: slots} + + for _, container := range containers { + + // Add on Cpu, Memory, GPU from container requests + // This is a limited set of resources owned by the pod + resources.Slot.Cpu += int32(container.Resources.Requests.Cpu().Value()) + resources.Slot.Memory += container.Resources.Requests.Memory().Value() + resources.Slot.Storage += container.Resources.Requests.StorageEphemeral().Value() + + // We assume that a pod (node) only has access to the same GPU + gpus, ok := container.Resources.Limits["nvidia.com/gpu"] + if ok { + resources.Slot.Gpu += gpus.Value() + } + } + + // If we have zero cpus, assume 1 + if resources.Slot.Cpu == 0 { + resources.Slot.Cpu = 1 + } + return &resources +} diff --git a/pkg/types/types.go b/pkg/types/types.go deleted file mode 100644 index 96cb6cc..0000000 --- a/pkg/types/types.go +++ /dev/null @@ -1,29 +0,0 @@ -package types - -type AllocationResponse struct { - Graph Graph `json:"graph"` -} -type Paths struct { - Containment string `json:"containment"` -} -type Node struct { - ID string `json:"id"` - Metadata Metadata `json:"metadata,omitempty"` -} -type Edges struct { - Source string `json:"source"` - Target string `json:"target"` -} -type Metadata struct { - Type string `json:"type"` - Id int32 `json:"id"` - Rank int32 `json:"rank"` - Basename string `json:"basename"` - Exclusive bool `json:"exclusive"` - Paths map[string]string `json:"paths"` -} - -type Graph struct { - Nodes []Node `json:"nodes"` - Edges []Edges `json:"edges"` -}