Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Partial cancel #9

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ SELECT * from reservations;
- we need to use shrink or partial cancel here. And a shrink down to size 0 I assume is a cancel.
- [ ] For cancel, we would issue a cancel for every pod associated with a job. How can we avoid that (or is that OK?)
- [ ] we will eventually need another mechanism to move schedule queue aside from new submission
- What if instead of loop we do reservation every N jobs? Then we wouldn't need a loop?
- [ ] scheduleAt can be used to AskFlux in the future
- [ ] Nodes that are currently assigned need to be taken into account
- Right now they aren't included in resources, but instead should be "given" to Fluxion.
Expand Down
8 changes: 6 additions & 2 deletions api/v1alpha1/fluxjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,13 @@ type FluxJobSpec struct {
// +optional
Reservation bool `json:"reservation,omitempty"`

// Nodes needed for the job
// Slots needed for the job
// +optional
Nodes int32 `json:"nodes"`
Slots int32 `json:"nodes"`

// Cores per pod (slot)
// +optional
Cores int32 `json:"cores"`

// Resources assigned
// +optional
Expand Down
17 changes: 8 additions & 9 deletions api/v1alpha1/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package v1alpha1

import (
"context"
"fmt"

jobspec "github.com/compspec/jobspec-go/pkg/jobspec/v1"
jspec "github.com/converged-computing/fluxqueue/pkg/jobspec"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"

"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -21,13 +18,14 @@ var (
)

// SubmitFluxJob wraps a pod or job spec into a FluxJob
// We essentially create a CRD for a a FluxJob
// We essentially create a CRD for a a FluxJob. Note that we are asking
// for SLOTS and not nodes - a slot can be a part of a node.
func SubmitFluxJob(
ctx context.Context,
jobType JobWrapped,
name string,
namespace string,
nodes int32,
slots int32,
containers []corev1.Container,
) error {

Expand All @@ -51,14 +49,14 @@ func SubmitFluxJob(
slog.Error(err, "Issue with getting job", "Namespace", namespace, "Name", jobName)
return err
}
resources := jspec.GeneratePodResources(containers)
resources := jspec.GeneratePodResources(containers, slots)

// Artificially create a command for the name and namespace
command := fmt.Sprintf("echo %s %s", namespace, name)
command := []string{"echo", namespace, name}

// Generate a jobspec for that many nodes (starting simple)
// TODO will need to add GPU and memory here... if Flux supports memory
js, err := jobspec.NewSimpleJobspec(name, command, nodes, resources.Cpu)
js, err := jspec.NewJobspec(name, command, resources)
if err != nil {
slog.Error(err, "Issue with creating job", "Namespace", namespace, "Name", jobName)
return err
Expand All @@ -80,9 +78,10 @@ func SubmitFluxJob(
ObjectMeta: metav1.ObjectMeta{Name: jobName, Namespace: namespace},
Spec: FluxJobSpec{
JobSpec: jsString,
Nodes: nodes,
Slots: slots,
Type: jobType,
Name: name,
Cores: resources.Slot.Cpu,
},
Status: FluxJobStatus{
SubmitStatus: SubmitStatusNew,
Expand Down
3 changes: 2 additions & 1 deletion build/postgres/create-tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ CREATE TABLE pending_queue (
reservation INTEGER NOT NULL,
duration INTEGER NOT NULL,
created_at timestamptz NOT NULL default NOW(),
size INTEGER NOT NULL
size INTEGER NOT NULL,
cores INTEGER NOT NULL
);

CREATE UNIQUE INDEX pending_index ON pending_queue (name, namespace);
Expand Down
6 changes: 5 additions & 1 deletion chart/templates/fluxjob-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ spec:
A FluxJob is a mapping of a Kubernetes abstraction (e.g., job)
into a Flux JobSpec, one that Fluxion can digest.
properties:
cores:
description: Cores per pod (slot)
format: int32
type: integer
duration:
description: Duration is the maximum runtime of the job
format: int32
Expand All @@ -76,7 +80,7 @@ spec:
description: Original name of the job
type: string
nodes:
description: Nodes needed for the job
description: Slots needed for the job
format: int32
type: integer
object:
Expand Down
6 changes: 5 additions & 1 deletion config/crd/bases/jobs.converged-computing.org_fluxjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ spec:
A FluxJob is a mapping of a Kubernetes abstraction (e.g., job)
into a Flux JobSpec, one that Fluxion can digest.
properties:
cores:
description: Cores per pod (slot)
format: int32
type: integer
duration:
description: Duration is the maximum runtime of the job
format: int32
Expand All @@ -63,7 +67,7 @@ spec:
description: Original name of the job
type: string
nodes:
description: Nodes needed for the job
description: Slots needed for the job
format: int32
type: integer
object:
Expand Down
15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ module github.com/converged-computing/fluxqueue
go 1.22.0

require (
github.com/compspec/jobspec-go v0.0.0-20240510054255-ee02cdc7d3d4
github.com/converged-computing/fluxion v0.0.0-20250105140137-04388a62d0fa
github.com/jackc/pgx v3.6.2+incompatible
github.com/jackc/pgx/v5 v5.7.2
github.com/onsi/ginkgo/v2 v2.19.0
github.com/onsi/gomega v1.33.1
Expand All @@ -14,14 +11,18 @@ require (
github.com/riverqueue/river/rivershared v0.15.0
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.31.2
k8s.io/apimachinery v0.31.2
k8s.io/client-go v0.31.2
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.130.1
sigs.k8s.io/controller-runtime v0.19.3
)

require (
github.com/compspec/jobspec-go v0.0.0-20250130030627-58df7d7ed642 // indirect
github.com/converged-computing/fluxion v0.0.0-20250130025038-615e35a80230 // indirect
)

require (
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect
Expand All @@ -30,7 +31,7 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // jo
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
Expand Down Expand Up @@ -106,11 +107,11 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.31.2 // indirect
k8s.io/apiserver v0.31.2 // indirect
k8s.io/component-base v0.31.2 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 // indirect
Expand Down
13 changes: 4 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/compspec/jobspec-go v0.0.0-20240510054255-ee02cdc7d3d4 h1:4MaTp3OcUmp6HFEojeI//GthUt7GMYnB8K5OSZdKxZA=
github.com/compspec/jobspec-go v0.0.0-20240510054255-ee02cdc7d3d4/go.mod h1:BaJyxaOhESe2DD4lqBdwTEWOw0TaTZVJGPrFh6KyXQM=
github.com/converged-computing/fluxion v0.0.0-20250105140137-04388a62d0fa h1:F5/pXXI5F0jC4XG/nAgN65gBS7+2SYKlwdzkVzawPdI=
github.com/converged-computing/fluxion v0.0.0-20250105140137-04388a62d0fa/go.mod h1:tNlvJY1yFWpp/QqdqJnq8YMGYG99K6YMxDmpu9IVS1E=
github.com/compspec/jobspec-go v0.0.0-20250130030627-58df7d7ed642 h1:kLwazFe8Cl7ZUuF7LidS91IwBjPpcWVVZKEN2VOq6g8=
github.com/compspec/jobspec-go v0.0.0-20250130030627-58df7d7ed642/go.mod h1:BaJyxaOhESe2DD4lqBdwTEWOw0TaTZVJGPrFh6KyXQM=
github.com/converged-computing/fluxion v0.0.0-20250130025038-615e35a80230 h1:Nzr3Jywwinpdo2Tt13RQPiKiMJhwAbxkbmancpPMCTM=
github.com/converged-computing/fluxion v0.0.0-20250130025038-615e35a80230/go.mod h1:tNlvJY1yFWpp/QqdqJnq8YMGYG99K6YMxDmpu9IVS1E=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -32,7 +32,6 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down Expand Up @@ -81,8 +80,6 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o=
github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I=
github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI=
github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
Expand Down Expand Up @@ -280,8 +277,6 @@ k8s.io/client-go v0.31.2 h1:Y2F4dxU5d3AQj+ybwSMqQnpZH9F30//1ObxOKlTI9yc=
k8s.io/client-go v0.31.2/go.mod h1:NPa74jSVR/+eez2dFsEIHNa+3o09vtNaWwWwb1qSxSs=
k8s.io/component-base v0.31.2 h1:Z1J1LIaC0AV+nzcPRFqfK09af6bZ4D1nAOpWsy9owlA=
k8s.io/component-base v0.31.2/go.mod h1:9PeyyFN/drHjtJZMCTkSpQJS3U9OXORnHQqMLDz0sUQ=
k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8=
k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag=
Expand Down
2 changes: 1 addition & 1 deletion hack/quick-build-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ helm install \
--set scheduler.image=${REGISTRY}/fluxqueue-scheduler:latest \
--set postgres.image=${REGISTRY}/fluxqueue-postgres:latest \
--set controllerManager.manager.imagePullPolicy=Never \
--set controllerManager.fluxion.image.tag=grow-api \
--set controllerManager.fluxion.image.tag=satisfy \
--namespace ${NAMESPACE} \
--create-namespace \
--set scheduler.pullPolicy=Never \
Expand Down
2 changes: 1 addition & 1 deletion hack/quick-deploy-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ helm install \
--set scheduler.image=${REGISTRY}/fluxqueue-scheduler:latest \
--set postgres.image=${REGISTRY}/fluxqueue-postgres:latest \
--set controllerManager.manager.imagePullPolicy=Never \
--set controllerManager.fluxion.image.tag=grow-api \
--set controllerManager.fluxion.image.tag=satisfy \
--namespace ${NAMESPACE} \
--create-namespace \
--set scheduler.pullPolicy=Never \
Expand Down
3 changes: 2 additions & 1 deletion pkg/fluxqueue/fluxqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ func (q *Queue) Enqueue(spec *api.FluxJob) (types.EnqueueStatus, error) {
spec.Spec.Type,
reservation,
spec.Spec.Duration,
spec.Spec.Nodes,
spec.Spec.Slots,
spec.Spec.Cores,
)

// If unknown, we won't give status submit, and it should requeue to try again
Expand Down
4 changes: 2 additions & 2 deletions pkg/fluxqueue/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ const (
IsPendingQuery = "select * from pending_queue where name = $1 and namespace = $2;"

// Insert into pending queue (assumes after above query, we've checked it does not exist)
InsertIntoPending = "insert into pending_queue (jobspec, flux_job_name, namespace, name, type, reservation, duration, size) values ($1, $2, $3, $4, $5, $6, $7, $8);"
InsertIntoPending = "insert into pending_queue (jobspec, flux_job_name, namespace, name, type, reservation, duration, size, cores) values ($1, $2, $3, $4, $5, $6, $7, $8, $9);"
// TODO add back created_at

// We remove from pending to allow another group submission of the same name on cleanup
DeleteFromPendingQuery = "delete from pending_queue where name=$1 and namespace=$2;"

// Easy Queries to get jobs
// Select jobs based on creation timestamp
SelectPendingByCreation = "select jobspec, name, flux_job_name, namespace, type, reservation, duration, size from pending_queue order by created_at desc;"
SelectPendingByCreation = "select jobspec, name, flux_job_name, namespace, type, reservation, duration, size, cores from pending_queue order by created_at desc;"

// Reservations
AddReservationQuery = "insert into reservations (name, flux_id) values ($1, $2);"
Expand Down
1 change: 1 addition & 0 deletions pkg/fluxqueue/strategy/easy.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func (s EasyBackfill) ReadyJobs(ctx context.Context, pool *pgxpool.Pool) ([]work
Reservation: model.Reservation,
Size: model.Size,
Duration: model.Duration,
Cores: model.Cores,
}
jobs = append(jobs, jobArgs)
}
Expand Down
Loading