From f8013fcf28f8d9ec8ac09c69f76bc7631b367c37 Mon Sep 17 00:00:00 2001 From: Chris Martin Date: Mon, 22 Jul 2024 09:13:53 +0100 Subject: [PATCH 1/4] Remove useAdjustedFairShareProtection flag (#3806) Signed-off-by: Chris Martin Co-authored-by: Chris Martin --- config/scheduler/config.yaml | 1 - .../scheduler/configuration/configuration.go | 2 - .../scheduler/preempting_queue_scheduler.go | 38 ++++++++----------- .../preempting_queue_scheduler_test.go | 3 -- internal/scheduler/scheduling_algo.go | 1 - internal/scheduler/simulator/simulator.go | 1 - .../scheduler/testfixtures/testfixtures.go | 1 - 7 files changed, 16 insertions(+), 31 deletions(-) diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index 1122699bde0..4b281652a3c 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -87,7 +87,6 @@ scheduling: disableScheduling: false enableAssertions: false protectedFractionOfFairShare: 1.0 - useAdjustedFairShareProtection: true nodeIdLabel: "kubernetes.io/hostname" priorityClasses: armada-default: diff --git a/internal/scheduler/configuration/configuration.go b/internal/scheduler/configuration/configuration.go index f78bd3fe3e7..b2b9fb1c629 100644 --- a/internal/scheduler/configuration/configuration.go +++ b/internal/scheduler/configuration/configuration.go @@ -167,8 +167,6 @@ type SchedulingConfig struct { EnableAssertions bool // Only queues allocated more than this fraction of their fair share are considered for preemption. ProtectedFractionOfFairShare float64 `validate:"gte=0"` - // Use Max(AdjustedFairShare, FairShare) for fair share protection. If false then FairShare will be used. - UseAdjustedFairShareProtection bool // Armada adds a node selector term to every scheduled pod using this label with the node name as value. // This to force kube-scheduler to schedule pods on the node chosen by Armada. // For example, if NodeIdLabel is "kubernetes.io/hostname" and armada schedules a pod on node "myNode", diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 19a48c1d386..70981ac214d 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -28,13 +28,12 @@ import ( // PreemptingQueueScheduler is a scheduler that makes a unified decisions on which jobs to preempt and schedule. // Uses QueueScheduler as a building block. type PreemptingQueueScheduler struct { - schedulingContext *schedulercontext.SchedulingContext - constraints schedulerconstraints.SchedulingConstraints - floatingResourceTypes *floatingresources.FloatingResourceTypes - protectedFractionOfFairShare float64 - useAdjustedFairShareProtection bool - jobRepo JobRepository - nodeDb *nodedb.NodeDb + schedulingContext *schedulercontext.SchedulingContext + constraints schedulerconstraints.SchedulingConstraints + floatingResourceTypes *floatingresources.FloatingResourceTypes + protectedFractionOfFairShare float64 + jobRepo JobRepository + nodeDb *nodedb.NodeDb // Maps job ids to the id of the node the job is associated with. // For scheduled or running jobs, that is the node the job is assigned to. // For preempted jobs, that is the node the job was preempted from. @@ -54,7 +53,6 @@ func NewPreemptingQueueScheduler( constraints schedulerconstraints.SchedulingConstraints, floatingResourceTypes *floatingresources.FloatingResourceTypes, protectedFractionOfFairShare float64, - useAdjustedFairShareProtection bool, jobRepo JobRepository, nodeDb *nodedb.NodeDb, initialNodeIdByJobId map[string]string, @@ -75,16 +73,15 @@ func NewPreemptingQueueScheduler( initialJobIdsByGangId[gangId] = maps.Clone(jobIds) } return &PreemptingQueueScheduler{ - schedulingContext: sctx, - constraints: constraints, - floatingResourceTypes: floatingResourceTypes, - protectedFractionOfFairShare: protectedFractionOfFairShare, - useAdjustedFairShareProtection: useAdjustedFairShareProtection, - jobRepo: jobRepo, - nodeDb: nodeDb, - nodeIdByJobId: maps.Clone(initialNodeIdByJobId), - jobIdsByGangId: initialJobIdsByGangId, - gangIdByJobId: maps.Clone(initialGangIdByJobId), + schedulingContext: sctx, + constraints: constraints, + floatingResourceTypes: floatingResourceTypes, + protectedFractionOfFairShare: protectedFractionOfFairShare, + jobRepo: jobRepo, + nodeDb: nodeDb, + nodeIdByJobId: maps.Clone(initialNodeIdByJobId), + jobIdsByGangId: initialJobIdsByGangId, + gangIdByJobId: maps.Clone(initialGangIdByJobId), } } @@ -136,10 +133,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche } if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.Queue()]; ok { actualShare := sch.schedulingContext.FairnessCostProvider.UnweightedCostFromQueue(qctx) / totalCost - fairShare := qctx.FairShare - if sch.useAdjustedFairShareProtection { - fairShare = math.Max(qctx.AdjustedFairShare, fairShare) - } + fairShare := math.Max(qctx.AdjustedFairShare, qctx.FairShare) fractionOfFairShare := actualShare / fairShare if fractionOfFairShare <= sch.protectedFractionOfFairShare { return false diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index 17c2af68718..57c4553a5a5 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -1869,7 +1869,6 @@ func TestPreemptingQueueScheduler(t *testing.T) { constraints, testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, - tc.SchedulingConfig.UseAdjustedFairShareProtection, NewSchedulerJobRepositoryAdapter(jobDbTxn), nodeDb, nodeIdByJobId, @@ -2224,7 +2223,6 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { constraints, testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, - tc.SchedulingConfig.UseAdjustedFairShareProtection, NewSchedulerJobRepositoryAdapter(jobDbTxn), nodeDb, nil, @@ -2286,7 +2284,6 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { constraints, testfixtures.TestEmptyFloatingResources, tc.SchedulingConfig.ProtectedFractionOfFairShare, - tc.SchedulingConfig.UseAdjustedFairShareProtection, NewSchedulerJobRepositoryAdapter(jobDbTxn), nodeDb, nil, diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index b4b04e27ddc..9e96dce55df 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -498,7 +498,6 @@ func (l *FairSchedulingAlgo) schedulePool( constraints, l.floatingResourceTypes, l.schedulingConfig.ProtectedFractionOfFairShare, - l.schedulingConfig.UseAdjustedFairShareProtection, NewSchedulerJobRepositoryAdapter(fsctx.txn), nodeDb, fsctx.nodeIdByJobId, diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index e975a659d1e..ab74665fbd1 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -520,7 +520,6 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { constraints, nloatingResourceTypes, s.schedulingConfig.ProtectedFractionOfFairShare, - s.schedulingConfig.UseAdjustedFairShareProtection, scheduler.NewSchedulerJobRepositoryAdapter(txn), nodeDb, // TODO: Necessary to support partial eviction. diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index 5514c53f17f..740a537afe5 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -192,7 +192,6 @@ func TestSchedulingConfig() schedulerconfiguration.SchedulingConfig { ExecutorTimeout: 15 * time.Minute, MaxUnacknowledgedJobsPerExecutor: math.MaxInt, SupportedResourceTypes: GetTestSupportedResourceTypes(), - UseAdjustedFairShareProtection: true, } } From bf3ba9958ee368a9e18d44abdc1a093b17dff6d8 Mon Sep 17 00:00:00 2001 From: robertdavidsmith <34475852+robertdavidsmith@users.noreply.github.com> Date: Mon, 22 Jul 2024 12:50:09 +0100 Subject: [PATCH 2/4] Enable expose profiling port for remaining charts, better profiling error handling (#3809) Co-authored-by: Robert Smith Signed-off-by: Robert Smith --- deployment/armada/templates/deployment.yaml | 2 +- .../armada/templates/profiling-ingress.yaml | 35 +++++++++++++++++++ .../armada/templates/profiling-service.yaml | 19 ++++++++++ .../binoculars/templates/deployment.yaml | 2 +- .../templates/profiling-ingress.yaml | 35 +++++++++++++++++++ .../templates/profiling-service.yaml | 19 ++++++++++ .../event-ingester/templates/deployment.yaml | 2 +- .../templates/profiling-ingress.yaml | 35 +++++++++++++++++++ .../templates/profiling-service.yaml | 19 ++++++++++ deployment/executor/templates/deployment.yaml | 2 +- .../executor/templates/profiling-ingress.yaml | 35 +++++++++++++++++++ .../executor/templates/profiling-service.yaml | 19 ++++++++++ .../templates/deployment.yaml | 2 +- .../templates/profiling-ingress.yaml | 35 +++++++++++++++++++ .../templates/profiling-service.yaml | 19 ++++++++++ .../lookout-v2/templates/deployment.yaml | 2 +- .../templates/profiling-ingress.yaml | 35 +++++++++++++++++++ .../templates/profiling-service.yaml | 19 ++++++++++ .../scheduler-ingester-profiling-service.yaml | 1 - .../scheduler-profiling-service.yaml | 1 - docs/developer/pprof.md | 5 ++- internal/common/profiling/http.go | 19 ++++++---- 22 files changed, 345 insertions(+), 17 deletions(-) create mode 100644 deployment/armada/templates/profiling-ingress.yaml create mode 100644 deployment/armada/templates/profiling-service.yaml create mode 100644 deployment/binoculars/templates/profiling-ingress.yaml create mode 100644 deployment/binoculars/templates/profiling-service.yaml create mode 100644 deployment/event-ingester/templates/profiling-ingress.yaml create mode 100644 deployment/event-ingester/templates/profiling-service.yaml create mode 100644 deployment/executor/templates/profiling-ingress.yaml create mode 100644 deployment/executor/templates/profiling-service.yaml create mode 100644 deployment/lookout-ingester-v2/templates/profiling-ingress.yaml create mode 100644 deployment/lookout-ingester-v2/templates/profiling-service.yaml create mode 100644 deployment/lookout-v2/templates/profiling-ingress.yaml create mode 100644 deployment/lookout-v2/templates/profiling-service.yaml diff --git a/deployment/armada/templates/deployment.yaml b/deployment/armada/templates/deployment.yaml index 7683e97108f..c1a84e151df 100644 --- a/deployment/armada/templates/deployment.yaml +++ b/deployment/armada/templates/deployment.yaml @@ -72,7 +72,7 @@ spec: {{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.port }} - containerPort: {{ .Values.applicationConfig.profiling.port }} protocol: TCP - name: pprof + name: profiling {{- end }} volumeMounts: - name: user-config diff --git a/deployment/armada/templates/profiling-ingress.yaml b/deployment/armada/templates/profiling-ingress.yaml new file mode 100644 index 00000000000..be697e47a8b --- /dev/null +++ b/deployment/armada/templates/profiling-ingress.yaml @@ -0,0 +1,35 @@ +{{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.hostnames }} +{{- $root := . -}} +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: {{ include "armada.name" . }}-profiling + namespace: {{ .Release.Namespace }} + annotations: + certmanager.k8s.io/cluster-issuer: {{ required "A value is required for .Values.applicationConfig.profiling.clusterIssuer" .Values.applicationConfig.profiling.clusterIssuer }} + cert-manager.io/cluster-issuer: {{ required "A value is required for .Values.applicationConfig.profiling.clusterIssuer" .Values.applicationConfig.profiling.clusterIssuer }} + labels: + {{- include "armada.labels.all" . | nindent 4 }} +spec: + rules: + {{- range required "A value is required for .Values.applicationConfig.profiling.hostnames" .Values.applicationConfig.profiling.hostnames }} + - host: {{ . }} + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: {{ include "armada.name" $root }}-server-profiling + port: + number: {{ $root.Values.applicationConfig.profiling.port }} + {{ end -}} + tls: + - hosts: + {{- range required "A value is required for .Values.applicationConfig.profiling.hostnames" .Values.applicationConfig.profiling.hostnames }} + - {{ . -}} + {{ end }} + secretName: {{ include "armada.name" $root }}-profiling-service-tls +--- +{{- end }} + diff --git a/deployment/armada/templates/profiling-service.yaml b/deployment/armada/templates/profiling-service.yaml new file mode 100644 index 00000000000..dda82cb1ae5 --- /dev/null +++ b/deployment/armada/templates/profiling-service.yaml @@ -0,0 +1,19 @@ +{{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.port }} +apiVersion: v1 +kind: Service +metadata: + name: {{ include "armada.name" . }}-server-profiling + namespace: {{ .Release.Namespace }} + labels: + {{- include "armada.labels.all" . | nindent 4 }} + name: {{ include "armada.name" . }}-profiling +spec: + selector: + app: {{ include "armada.name" . }} + ports: + - name: profiling + protocol: TCP + port: {{ .Values.applicationConfig.profiling.port }} +--- +{{- end }} + diff --git a/deployment/binoculars/templates/deployment.yaml b/deployment/binoculars/templates/deployment.yaml index 2bb8c051a42..51962b2de1f 100644 --- a/deployment/binoculars/templates/deployment.yaml +++ b/deployment/binoculars/templates/deployment.yaml @@ -57,7 +57,7 @@ spec: {{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.port }} - containerPort: {{ .Values.applicationConfig.profiling.port }} protocol: TCP - name: pprof + name: profiling {{- end }} volumeMounts: - name: user-config diff --git a/deployment/binoculars/templates/profiling-ingress.yaml b/deployment/binoculars/templates/profiling-ingress.yaml new file mode 100644 index 00000000000..ab518bea985 --- /dev/null +++ b/deployment/binoculars/templates/profiling-ingress.yaml @@ -0,0 +1,35 @@ +{{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.hostnames }} +{{- $root := . -}} +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: {{ include "binoculars.name" . }}-profiling + namespace: {{ .Release.Namespace }} + annotations: + certmanager.k8s.io/cluster-issuer: {{ required "A value is required for .Values.applicationConfig.profiling.clusterIssuer" .Values.applicationConfig.profiling.clusterIssuer }} + cert-manager.io/cluster-issuer: {{ required "A value is required for .Values.applicationConfig.profiling.clusterIssuer" .Values.applicationConfig.profiling.clusterIssuer }} + labels: + {{- include "binoculars.labels.all" . | nindent 4 }} +spec: + rules: + {{- range required "A value is required for .Values.applicationConfig.profiling.hostnames" .Values.applicationConfig.profiling.hostnames }} + - host: {{ . }} + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: {{ include "binoculars.name" $root }}-profiling + port: + number: {{ $root.Values.applicationConfig.profiling.port }} + {{ end -}} + tls: + - hosts: + {{- range required "A value is required for .Values.applicationConfig.profiling.hostnames" .Values.applicationConfig.profiling.hostnames }} + - {{ . -}} + {{ end }} + secretName: {{ include "binoculars.name" $root }}-profiling-service-tls +--- +{{- end }} + diff --git a/deployment/binoculars/templates/profiling-service.yaml b/deployment/binoculars/templates/profiling-service.yaml new file mode 100644 index 00000000000..455051499eb --- /dev/null +++ b/deployment/binoculars/templates/profiling-service.yaml @@ -0,0 +1,19 @@ +{{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.port }} +apiVersion: v1 +kind: Service +metadata: + name: {{ include "binoculars.name" . }}-profiling + namespace: {{ .Release.Namespace }} + labels: + {{- include "binoculars.labels.all" . | nindent 4 }} + name: {{ include "binoculars.name" . }}-profiling +spec: + selector: + app: {{ include "binoculars.name" . }} + ports: + - name: profiling + protocol: TCP + port: {{ .Values.applicationConfig.profiling.port }} +--- +{{- end }} + diff --git a/deployment/event-ingester/templates/deployment.yaml b/deployment/event-ingester/templates/deployment.yaml index 7c7943aba29..93d1ed42890 100644 --- a/deployment/event-ingester/templates/deployment.yaml +++ b/deployment/event-ingester/templates/deployment.yaml @@ -48,7 +48,7 @@ spec: {{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.port }} - containerPort: {{ .Values.applicationConfig.profiling.port }} protocol: TCP - name: pprof + name: profiling {{- end }} - containerPort: {{ .Values.applicationConfig.metricsPort }} protocol: TCP diff --git a/deployment/event-ingester/templates/profiling-ingress.yaml b/deployment/event-ingester/templates/profiling-ingress.yaml new file mode 100644 index 00000000000..e90e88b6070 --- /dev/null +++ b/deployment/event-ingester/templates/profiling-ingress.yaml @@ -0,0 +1,35 @@ +{{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.hostnames }} +{{- $root := . -}} +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: {{ include "event_ingester.name" . }}-profiling + namespace: {{ .Release.Namespace }} + annotations: + certmanager.k8s.io/cluster-issuer: {{ required "A value is required for .Values.applicationConfig.profiling.clusterIssuer" .Values.applicationConfig.profiling.clusterIssuer }} + cert-manager.io/cluster-issuer: {{ required "A value is required for .Values.applicationConfig.profiling.clusterIssuer" .Values.applicationConfig.profiling.clusterIssuer }} + labels: + {{- include "event_ingester.labels.all" . | nindent 4 }} +spec: + rules: + {{- range required "A value is required for .Values.applicationConfig.profiling.hostnames" .Values.applicationConfig.profiling.hostnames }} + - host: {{ . }} + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: {{ include "event_ingester.name" $root }}-profiling + port: + number: {{ $root.Values.applicationConfig.profiling.port }} + {{ end -}} + tls: + - hosts: + {{- range required "A value is required for .Values.applicationConfig.profiling.hostnames" .Values.applicationConfig.profiling.hostnames }} + - {{ . -}} + {{ end }} + secretName: {{ include "event_ingester.name" $root }}-profiling-service-tls +--- +{{- end }} + diff --git a/deployment/event-ingester/templates/profiling-service.yaml b/deployment/event-ingester/templates/profiling-service.yaml new file mode 100644 index 00000000000..9ff3183f978 --- /dev/null +++ b/deployment/event-ingester/templates/profiling-service.yaml @@ -0,0 +1,19 @@ +{{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.port }} +apiVersion: v1 +kind: Service +metadata: + name: {{ include "event_ingester.name" . }}-profiling + namespace: {{ .Release.Namespace }} + labels: + {{- include "event_ingester.labels.all" . | nindent 4 }} + name: {{ include "event_ingester.name" . }}-profiling +spec: + selector: + app: {{ include "event_ingester.name" . }} + ports: + - name: profiling + protocol: TCP + port: {{ .Values.applicationConfig.profiling.port }} +--- +{{- end }} + diff --git a/deployment/executor/templates/deployment.yaml b/deployment/executor/templates/deployment.yaml index 1694a53a4d7..abd30ade56f 100644 --- a/deployment/executor/templates/deployment.yaml +++ b/deployment/executor/templates/deployment.yaml @@ -55,7 +55,7 @@ spec: {{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.port }} - containerPort: {{ .Values.applicationConfig.profiling.port }} protocol: TCP - name: pprof + name: profiling {{- end }} volumeMounts: - name: user-config diff --git a/deployment/executor/templates/profiling-ingress.yaml b/deployment/executor/templates/profiling-ingress.yaml new file mode 100644 index 00000000000..f86ee1119f3 --- /dev/null +++ b/deployment/executor/templates/profiling-ingress.yaml @@ -0,0 +1,35 @@ +{{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.hostnames }} +{{- $root := . -}} +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: {{ include "executor.name" . }}-profiling + namespace: {{ .Release.Namespace }} + annotations: + certmanager.k8s.io/cluster-issuer: {{ required "A value is required for .Values.applicationConfig.profiling.clusterIssuer" .Values.applicationConfig.profiling.clusterIssuer }} + cert-manager.io/cluster-issuer: {{ required "A value is required for .Values.applicationConfig.profiling.clusterIssuer" .Values.applicationConfig.profiling.clusterIssuer }} + labels: + {{- include "executor.labels.all" . | nindent 4 }} +spec: + rules: + {{- range required "A value is required for .Values.applicationConfig.profiling.hostnames" .Values.applicationConfig.profiling.hostnames }} + - host: {{ . }} + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: {{ include "executor.name" $root }}-profiling + port: + number: {{ $root.Values.applicationConfig.profiling.port }} + {{ end -}} + tls: + - hosts: + {{- range required "A value is required for .Values.applicationConfig.profiling.hostnames" .Values.applicationConfig.profiling.hostnames }} + - {{ . -}} + {{ end }} + secretName: {{ include "executor.name" $root }}-profiling-service-tls +--- +{{- end }} + diff --git a/deployment/executor/templates/profiling-service.yaml b/deployment/executor/templates/profiling-service.yaml new file mode 100644 index 00000000000..42fe957da66 --- /dev/null +++ b/deployment/executor/templates/profiling-service.yaml @@ -0,0 +1,19 @@ +{{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.port }} +apiVersion: v1 +kind: Service +metadata: + name: {{ include "executor.name" . }}-profiling + namespace: {{ .Release.Namespace }} + labels: + {{- include "executor.labels.all" . | nindent 4 }} + name: {{ include "executor.name" . }}-profiling +spec: + selector: + app: {{ include "executor.name" . }} + ports: + - name: profiling + protocol: TCP + port: {{ .Values.applicationConfig.profiling.port }} +--- +{{- end }} + diff --git a/deployment/lookout-ingester-v2/templates/deployment.yaml b/deployment/lookout-ingester-v2/templates/deployment.yaml index efe46d88554..5d4449564dd 100644 --- a/deployment/lookout-ingester-v2/templates/deployment.yaml +++ b/deployment/lookout-ingester-v2/templates/deployment.yaml @@ -48,7 +48,7 @@ spec: {{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.port }} - containerPort: {{ .Values.applicationConfig.profiling.port }} protocol: TCP - name: pprof + name: profiling {{- end }} - containerPort: {{ .Values.applicationConfig.metricsPort }} protocol: TCP diff --git a/deployment/lookout-ingester-v2/templates/profiling-ingress.yaml b/deployment/lookout-ingester-v2/templates/profiling-ingress.yaml new file mode 100644 index 00000000000..2cc39735155 --- /dev/null +++ b/deployment/lookout-ingester-v2/templates/profiling-ingress.yaml @@ -0,0 +1,35 @@ +{{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.hostnames }} +{{- $root := . -}} +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: {{ include "lookout_ingester_v2.name" . }}-profiling + namespace: {{ .Release.Namespace }} + annotations: + certmanager.k8s.io/cluster-issuer: {{ required "A value is required for .Values.applicationConfig.profiling.clusterIssuer" .Values.applicationConfig.profiling.clusterIssuer }} + cert-manager.io/cluster-issuer: {{ required "A value is required for .Values.applicationConfig.profiling.clusterIssuer" .Values.applicationConfig.profiling.clusterIssuer }} + labels: + {{- include "lookout_ingester_v2.labels.all" . | nindent 4 }} +spec: + rules: + {{- range required "A value is required for .Values.applicationConfig.profiling.hostnames" .Values.applicationConfig.profiling.hostnames }} + - host: {{ . }} + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: {{ include "lookout_ingester_v2.name" $root }}-profiling + port: + number: {{ $root.Values.applicationConfig.profiling.port }} + {{ end -}} + tls: + - hosts: + {{- range required "A value is required for .Values.applicationConfig.profiling.hostnames" .Values.applicationConfig.profiling.hostnames }} + - {{ . -}} + {{ end }} + secretName: {{ include "lookout_ingester_v2.name" $root }}-profiling-service-tls +--- +{{- end }} + diff --git a/deployment/lookout-ingester-v2/templates/profiling-service.yaml b/deployment/lookout-ingester-v2/templates/profiling-service.yaml new file mode 100644 index 00000000000..41a9d2121c0 --- /dev/null +++ b/deployment/lookout-ingester-v2/templates/profiling-service.yaml @@ -0,0 +1,19 @@ +{{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.port }} +apiVersion: v1 +kind: Service +metadata: + name: {{ include "lookout_ingester_v2.name" . }}-profiling + namespace: {{ .Release.Namespace }} + labels: + {{- include "lookout_ingester_v2.labels.all" . | nindent 4 }} + name: {{ include "lookout_ingester_v2.name" . }}-profiling +spec: + selector: + app: {{ include "lookout_ingester_v2.name" . }} + ports: + - name: profiling + protocol: TCP + port: {{ .Values.applicationConfig.profiling.port }} +--- +{{- end }} + diff --git a/deployment/lookout-v2/templates/deployment.yaml b/deployment/lookout-v2/templates/deployment.yaml index f97d9f9ae8b..37aed4fff0c 100644 --- a/deployment/lookout-v2/templates/deployment.yaml +++ b/deployment/lookout-v2/templates/deployment.yaml @@ -51,7 +51,7 @@ spec: {{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.port }} - containerPort: {{ .Values.applicationConfig.profiling.port }} protocol: TCP - name: pprof + name: profiling {{- end }} - containerPort: {{ .Values.applicationConfig.metricsPort }} protocol: TCP diff --git a/deployment/lookout-v2/templates/profiling-ingress.yaml b/deployment/lookout-v2/templates/profiling-ingress.yaml new file mode 100644 index 00000000000..91ce1afaac3 --- /dev/null +++ b/deployment/lookout-v2/templates/profiling-ingress.yaml @@ -0,0 +1,35 @@ +{{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.hostnames }} +{{- $root := . -}} +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: {{ include "lookout_v2.name" . }}-profiling + namespace: {{ .Release.Namespace }} + annotations: + certmanager.k8s.io/cluster-issuer: {{ required "A value is required for .Values.applicationConfig.profiling.clusterIssuer" .Values.applicationConfig.profiling.clusterIssuer }} + cert-manager.io/cluster-issuer: {{ required "A value is required for .Values.applicationConfig.profiling.clusterIssuer" .Values.applicationConfig.profiling.clusterIssuer }} + labels: + {{- include "lookout_v2.labels.all" . | nindent 4 }} +spec: + rules: + {{- range required "A value is required for .Values.applicationConfig.profiling.hostnames" .Values.applicationConfig.profiling.hostnames }} + - host: {{ . }} + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: {{ include "lookout_v2.name" $root }}-profiling + port: + number: {{ $root.Values.applicationConfig.profiling.port }} + {{ end -}} + tls: + - hosts: + {{- range required "A value is required for .Values.applicationConfig.profiling.hostnames" .Values.applicationConfig.profiling.hostnames }} + - {{ . -}} + {{ end }} + secretName: {{ include "lookout_v2.name" $root }}-profiling-service-tls +--- +{{- end }} + diff --git a/deployment/lookout-v2/templates/profiling-service.yaml b/deployment/lookout-v2/templates/profiling-service.yaml new file mode 100644 index 00000000000..2cab97e6c1c --- /dev/null +++ b/deployment/lookout-v2/templates/profiling-service.yaml @@ -0,0 +1,19 @@ +{{- if and .Values.applicationConfig.profiling .Values.applicationConfig.profiling.port }} +apiVersion: v1 +kind: Service +metadata: + name: {{ include "lookout_v2.name" . }}-profiling + namespace: {{ .Release.Namespace }} + labels: + {{- include "lookout_v2.labels.all" . | nindent 4 }} + name: {{ include "lookout_v2.name" . }}-profiling +spec: + selector: + app: {{ include "lookout_v2.name" . }} + ports: + - name: profiling + protocol: TCP + port: {{ .Values.applicationConfig.profiling.port }} +--- +{{- end }} + diff --git a/deployment/scheduler/templates/scheduler-ingester-profiling-service.yaml b/deployment/scheduler/templates/scheduler-ingester-profiling-service.yaml index 6fd27f0d89b..5ee74583a49 100644 --- a/deployment/scheduler/templates/scheduler-ingester-profiling-service.yaml +++ b/deployment/scheduler/templates/scheduler-ingester-profiling-service.yaml @@ -10,7 +10,6 @@ metadata: spec: selector: app: {{ include "armada-scheduler.name" . }}-ingester - {{- include "armada-scheduler-ingester.labels.all" . | nindent 4 }} ports: - name: profiling protocol: TCP diff --git a/deployment/scheduler/templates/scheduler-profiling-service.yaml b/deployment/scheduler/templates/scheduler-profiling-service.yaml index 51b665f2940..77e30c396ad 100644 --- a/deployment/scheduler/templates/scheduler-profiling-service.yaml +++ b/deployment/scheduler/templates/scheduler-profiling-service.yaml @@ -12,7 +12,6 @@ metadata: spec: selector: statefulset.kubernetes.io/pod-name: {{ include "armada-scheduler.name" $root }}-{{ $i }} - {{- include "armada-scheduler.labels.identity" $root | nindent 4 }} ports: - name: profiling protocol: TCP diff --git a/docs/developer/pprof.md b/docs/developer/pprof.md index 6033f73d69e..dfd2117b74b 100644 --- a/docs/developer/pprof.md +++ b/docs/developer/pprof.md @@ -14,6 +14,5 @@ pprof: ["everyone"] ``` - It's possible to put pprof behind auth, see [api.md#authentication](./api.md#authentication) and [oidc.md](./oidc.md). -- For the scheduler, the helm chart will make a service and ingress for every pod. These are named `armada-scheduler-0-profiling` etc. -- For the scheduler ingester, the helm chart will make a single service and ingress called `armada-scheduler-ingester-profiling`. Note calls to these may not consistently go to the same pod. Use `kubectl port-forward`, or scale the deployment to size 1, if you need to consistently target one pod. -- For other services, the helm charts do not currently expose the profiling port. You can use `kubectl port-forward` to access these. +- For the Armada scheduler, the helm chart will make a service and ingress for every pod. These are named `armada-scheduler-0-profiling` etc. +- For other Armada components, the helm chart will make a single service and ingress called `armada--profiling`. Note calls to these may not consistently go to the same pod. Use `kubectl port-forward`, or scale the deployment to size 1, if you need to consistently target one pod. diff --git a/internal/common/profiling/http.go b/internal/common/profiling/http.go index b691ad16893..5557ef190c4 100644 --- a/internal/common/profiling/http.go +++ b/internal/common/profiling/http.go @@ -23,6 +23,10 @@ func SetupPprof(config *configuration.ProfilingConfig, ctx *armadacontext.Contex } log.Infof("Setting up pprof server on port %d", config.Port) + if config.Auth == nil { + log.Errorf("Pprof server auth not configured, will not set up pprof") + return nil + } authServices, err := auth.ConfigureAuth(*config.Auth) if err != nil { @@ -48,15 +52,18 @@ func SetupPprof(config *configuration.ProfilingConfig, ctx *armadacontext.Contex return ctx, authorizer.AuthorizeAction(armadacontext.FromGrpcCtx(ctx), "pprof") }) + serveFunc := func() error { + if err := serve.ListenAndServe(ctx, pprofServer); err != nil { + logging.WithStacktrace(ctx, err).Error("pprof server failure") + } + return err + } + if g != nil { - g.Go(func() error { - return serve.ListenAndServe(ctx, pprofServer) - }) + g.Go(serveFunc) } else { go func() { - if err := serve.ListenAndServe(ctx, pprofServer); err != nil { - logging.WithStacktrace(ctx, err).Error("pprof server failure") - } + _ = serveFunc() }() } return nil From 7ac7f90206b8beae6d4895604301d5cb709da0bc Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Mon, 22 Jul 2024 13:50:53 +0100 Subject: [PATCH 3/4] Add test for pulsarBatchSize on ingestion pipeline (#3755) * Add test for pulsarBatchSize on ingestion pipeline This test just confirms pulsarBatchSize is limit the number of events we process at once correctly Signed-off-by: JamesMurkin * Merge master - fix proto time Signed-off-by: JamesMurkin * Gofumpt Signed-off-by: JamesMurkin --------- Signed-off-by: JamesMurkin --- .../common/ingest/ingestion_pipeline_test.go | 91 ++++++++++++++++++- 1 file changed, 88 insertions(+), 3 deletions(-) diff --git a/internal/common/ingest/ingestion_pipeline_test.go b/internal/common/ingest/ingestion_pipeline_test.go index 51a4673b696..32f2644fc6e 100644 --- a/internal/common/ingest/ingestion_pipeline_test.go +++ b/internal/common/ingest/ingestion_pipeline_test.go @@ -185,15 +185,20 @@ func (s *simpleMessages) GetMessageIDs() []pulsar.MessageID { } type simpleConverter struct { - t *testing.T + t *testing.T + calls []*EventSequencesWithIds } -func newSimpleConverter(t *testing.T) InstructionConverter[*simpleMessages] { - return &simpleConverter{t} +func newSimpleConverter(t *testing.T) *simpleConverter { + return &simpleConverter{ + t: t, + calls: []*EventSequencesWithIds{}, + } } func (s *simpleConverter) Convert(_ *armadacontext.Context, msg *EventSequencesWithIds) *simpleMessages { s.t.Helper() + s.calls = append(s.calls, msg) assert.Len(s.t, msg.EventSequences, len(msg.MessageIds)) var converted []*simpleMessage for i, sequence := range msg.EventSequences { @@ -280,6 +285,65 @@ func TestRun_HappyPath_MultipleMessages(t *testing.T) { sink.assertDidProcess(messages) } +func TestRun_LimitsProcessingBatchSize(t *testing.T) { + tests := map[string]struct { + numberOfEventsPerMessage int + numberOfMessages int + batchSize int + expectedNumberOfBatchesProcessed int + }{ + "limits number of events processed per batch": { + numberOfEventsPerMessage: 1, + numberOfMessages: 5, + batchSize: 2, + expectedNumberOfBatchesProcessed: 3, + }, + "limit can be exceeded by one message": { + numberOfEventsPerMessage: 4, + numberOfMessages: 6, + batchSize: 5, + // Batches should get limited to 2 messages, each containing 4 events + expectedNumberOfBatchesProcessed: 3, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + ctx, cancel := armadacontext.WithDeadline(armadacontext.Background(), time.Now().Add(10*time.Second)) + messages := []pulsar.Message{} + for i := 0; i < tc.numberOfMessages; i++ { + messages = append(messages, pulsarutils.NewPulsarMessage(i, baseTime, marshal(t, generateEventSequence(tc.numberOfEventsPerMessage)))) + } + mockConsumer := newMockPulsarConsumer(t, messages, cancel) + converter := newSimpleConverter(t) + sink := newSimpleSink(t) + + pipeline := testPipeline(mockConsumer, converter, sink) + // Should limit batches of event sequences based on batch size + pipeline.pulsarBatchSize = tc.batchSize + + start := time.Now() + err := pipeline.Run(ctx) + assert.NoError(t, err) + elapsed := time.Since(start) + expectedMaximumDuration := time.Duration(tc.expectedNumberOfBatchesProcessed) * batchDuration + + assert.LessOrEqual(t, elapsed, expectedMaximumDuration) + mockConsumer.assertDidAck(messages) + assert.Len(t, converter.calls, tc.expectedNumberOfBatchesProcessed) + for _, call := range converter.calls { + eventCount := 0 + for _, seq := range call.EventSequences { + eventCount += len(seq.Events) + } + // BatchSize can be exceeded by one message, so at most the number of events in a single message + assert.True(t, eventCount < tc.batchSize+tc.numberOfEventsPerMessage) + } + sink.assertDidProcess(messages) + }) + } +} + func testPipeline(consumer pulsar.Consumer, converter InstructionConverter[*simpleMessages], sink Sink[*simpleMessages]) *IngestionPipeline[*simpleMessages] { return &IngestionPipeline[*simpleMessages]{ pulsarConfig: configuration.PulsarConfig{ @@ -297,6 +361,27 @@ func testPipeline(consumer pulsar.Consumer, converter InstructionConverter[*simp } } +func generateEventSequence(numberOfEvents int) *armadaevents.EventSequence { + sequence := &armadaevents.EventSequence{ + Queue: "test", + JobSetName: "test", + UserId: "chrisma", + Events: []*armadaevents.EventSequence_Event{}, + } + for i := 0; i < numberOfEvents; i++ { + sequence.Events = append(sequence.Events, &armadaevents.EventSequence_Event{ + Created: baseTimeProto, + Event: &armadaevents.EventSequence_Event_JobRunSucceeded{ + JobRunSucceeded: &armadaevents.JobRunSucceeded{ + RunId: runIdProto, + JobId: jobIdProto, + }, + }, + }) + } + return sequence +} + func marshal(t *testing.T, es *armadaevents.EventSequence) []byte { payload, err := proto.Marshal(es) assert.NoError(t, err) From 56c9d9f1032d9144f0b2d09eea2908f06cb0ca78 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Mon, 22 Jul 2024 14:00:42 +0100 Subject: [PATCH 4/4] Remove unnecessary passing of priorityClasses + remove unused fields (#3801) We pass priorityClasses around a lot, but largely these are unused at the bottom of the stack - I've removed this passing where the priorityClass is not used Removed a few other minor unused parameters Signed-off-by: JamesMurkin --- internal/scheduler/constraints/constraints.go | 2 +- .../scheduler/constraints/constraints_test.go | 2 +- internal/scheduler/context/context.go | 11 +----- internal/scheduler/context/context_test.go | 4 -- internal/scheduler/gang_scheduler.go | 2 +- internal/scheduler/gang_scheduler_test.go | 4 +- internal/scheduler/jobiteration.go | 19 ++++------ internal/scheduler/jobiteration_test.go | 16 ++++---- internal/scheduler/nodedb/nodedb.go | 21 +++++------ internal/scheduler/nodedb/nodedb_test.go | 26 ++++++------- .../scheduler/preempting_queue_scheduler.go | 37 +++++++------------ .../preempting_queue_scheduler_test.go | 11 +----- internal/scheduler/queue_scheduler_test.go | 7 +--- internal/scheduler/scheduler_metrics_test.go | 2 +- internal/scheduler/scheduler_test.go | 4 +- internal/scheduler/scheduling_algo.go | 2 - internal/scheduler/simulator/simulator.go | 4 +- internal/scheduler/submitcheck.go | 2 +- 18 files changed, 65 insertions(+), 111 deletions(-) diff --git a/internal/scheduler/constraints/constraints.go b/internal/scheduler/constraints/constraints.go index 0a7ddbcf1bd..5a57f476a23 100644 --- a/internal/scheduler/constraints/constraints.go +++ b/internal/scheduler/constraints/constraints.go @@ -150,7 +150,7 @@ func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity { return q } -func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext, queue string) (bool, string, error) { +func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext) (bool, string, error) { // maximumResourcesToSchedule check. if !isStrictlyLessOrEqual(sctx.ScheduledResources.Resources, constraints.maximumResourcesToSchedule) { return false, MaximumResourcesScheduledUnschedulableReason, nil diff --git a/internal/scheduler/constraints/constraints_test.go b/internal/scheduler/constraints/constraints_test.go index e422c23e7f6..4d749827f6c 100644 --- a/internal/scheduler/constraints/constraints_test.go +++ b/internal/scheduler/constraints/constraints_test.go @@ -144,7 +144,7 @@ func TestConstraints(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { - ok, unscheduledReason, err := tc.constraints.CheckRoundConstraints(tc.sctx, tc.queue) + ok, unscheduledReason, err := tc.constraints.CheckRoundConstraints(tc.sctx) require.NoError(t, err) require.Equal(t, tc.expectedCheckRoundConstraintsReason == "", ok) require.Equal(t, tc.expectedCheckRoundConstraintsReason, unscheduledReason) diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index 81b63586445..086348e2fc1 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -19,7 +19,6 @@ import ( "github.com/armadaproject/armada/internal/common/armadaerrors" armadamaps "github.com/armadaproject/armada/internal/common/maps" armadaslices "github.com/armadaproject/armada/internal/common/slices" - "github.com/armadaproject/armada/internal/common/types" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/fairness" "github.com/armadaproject/armada/internal/scheduler/interfaces" @@ -36,10 +35,6 @@ type SchedulingContext struct { Finished time.Time // Pool for which we're currently scheduling jobs. Pool string - // Allowed priority classes. - PriorityClasses map[string]types.PriorityClass - // Default priority class. - DefaultPriorityClass string // Determines how fairness is computed. FairnessCostProvider fairness.FairnessCostProvider // Limits job scheduling rate globally across all queues. @@ -78,8 +73,6 @@ type SchedulingContext struct { func NewSchedulingContext( pool string, - priorityClasses map[string]types.PriorityClass, - defaultPriorityClass string, fairnessCostProvider fairness.FairnessCostProvider, limiter *rate.Limiter, totalResources schedulerobjects.ResourceList, @@ -87,8 +80,6 @@ func NewSchedulingContext( return &SchedulingContext{ Started: time.Now(), Pool: pool, - PriorityClasses: priorityClasses, - DefaultPriorityClass: defaultPriorityClass, FairnessCostProvider: fairnessCostProvider, Limiter: limiter, QueueSchedulingContexts: make(map[string]*QueueSchedulingContext), @@ -822,7 +813,7 @@ func GangInfoFromLegacySchedulerJob(job interfaces.MinimalJob) (GangInfo, error) return gangInfo, nil } -func JobSchedulingContextsFromJobs[J *jobdb.Job](priorityClasses map[string]types.PriorityClass, jobs []J) []*JobSchedulingContext { +func JobSchedulingContextsFromJobs[J *jobdb.Job](jobs []J) []*JobSchedulingContext { jctxs := make([]*JobSchedulingContext, len(jobs)) for i, job := range jobs { jctxs[i] = JobSchedulingContextFromJob(job) diff --git a/internal/scheduler/context/context_test.go b/internal/scheduler/context/context_test.go index 9ca097cd70c..dac16c728c4 100644 --- a/internal/scheduler/context/context_test.go +++ b/internal/scheduler/context/context_test.go @@ -40,8 +40,6 @@ func TestSchedulingContextAccounting(t *testing.T) { require.NoError(t, err) sctx := NewSchedulingContext( "pool", - testfixtures.TestPriorityClasses, - testfixtures.TestDefaultPriorityClass, fairnessCostProvider, nil, totalResources, @@ -243,8 +241,6 @@ func TestCalculateFairShares(t *testing.T) { require.NoError(t, err) sctx := NewSchedulingContext( "pool", - testfixtures.TestPriorityClasses, - testfixtures.TestDefaultPriorityClass, fairnessCostProvider, nil, tc.availableResources, diff --git a/internal/scheduler/gang_scheduler.go b/internal/scheduler/gang_scheduler.go index 6b611704b24..05de27a94fe 100644 --- a/internal/scheduler/gang_scheduler.go +++ b/internal/scheduler/gang_scheduler.go @@ -103,7 +103,7 @@ func (sch *GangScheduler) updateGangSchedulingContextOnFailure(gctx *schedulerco func (sch *GangScheduler) Schedule(ctx *armadacontext.Context, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) { // Exit immediately if this is a new gang and we've hit any round limits. if !gctx.AllJobsEvicted { - if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext, gctx.Queue); err != nil || !ok { + if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext); err != nil || !ok { return } } diff --git a/internal/scheduler/gang_scheduler_test.go b/internal/scheduler/gang_scheduler_test.go index fd698b0fa05..1f105557c91 100644 --- a/internal/scheduler/gang_scheduler_test.go +++ b/internal/scheduler/gang_scheduler_test.go @@ -556,8 +556,6 @@ func TestGangScheduler(t *testing.T) { require.NoError(t, err) sctx := schedulercontext.NewSchedulingContext( "pool", - tc.SchedulingConfig.PriorityClasses, - tc.SchedulingConfig.DefaultPriorityClassName, fairnessCostProvider, rate.NewLimiter( rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate), @@ -593,7 +591,7 @@ func TestGangScheduler(t *testing.T) { var actualScheduledIndices []int scheduledGangs := 0 for i, gang := range tc.Gangs { - jctxs := schedulercontext.JobSchedulingContextsFromJobs(tc.SchedulingConfig.PriorityClasses, gang) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(gang) gctx := schedulercontext.NewGangSchedulingContext(jctxs) ok, reason, err := sch.Schedule(armadacontext.Background(), gctx) require.NoError(t, err) diff --git a/internal/scheduler/jobiteration.go b/internal/scheduler/jobiteration.go index 8f356db66e7..b814e9144a9 100644 --- a/internal/scheduler/jobiteration.go +++ b/internal/scheduler/jobiteration.go @@ -7,7 +7,6 @@ import ( "github.com/armadaproject/armada/internal/common/armadacontext" armadaslices "github.com/armadaproject/armada/internal/common/slices" - "github.com/armadaproject/armada/internal/common/types" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/jobdb" ) @@ -106,19 +105,17 @@ func (repo *InMemoryJobRepository) GetJobIterator(queue string) JobIterator { // QueuedJobsIterator is an iterator over all jobs in a queue. type QueuedJobsIterator struct { - repo JobRepository - jobIds []string - priorityClasses map[string]types.PriorityClass - idx int - ctx *armadacontext.Context + repo JobRepository + jobIds []string + idx int + ctx *armadacontext.Context } -func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, repo JobRepository, priorityClasses map[string]types.PriorityClass) *QueuedJobsIterator { +func NewQueuedJobsIterator(ctx *armadacontext.Context, queue string, repo JobRepository) *QueuedJobsIterator { return &QueuedJobsIterator{ - jobIds: repo.GetQueueJobIds(queue), - repo: repo, - priorityClasses: priorityClasses, - ctx: ctx, + jobIds: repo.GetQueueJobIds(queue), + repo: repo, + ctx: ctx, } } diff --git a/internal/scheduler/jobiteration_test.go b/internal/scheduler/jobiteration_test.go index d714a01531c..c4f10a00bfa 100644 --- a/internal/scheduler/jobiteration_test.go +++ b/internal/scheduler/jobiteration_test.go @@ -64,7 +64,7 @@ func TestMultiJobsIterator_TwoQueues(t *testing.T) { ctx := armadacontext.Background() its := make([]JobIterator, 3) for i, queue := range []string{"A", "B", "C"} { - it := NewQueuedJobsIterator(ctx, queue, repo, nil) + it := NewQueuedJobsIterator(ctx, queue, repo) its[i] = it } it := NewMultiJobsIterator(its...) @@ -93,7 +93,7 @@ func TestQueuedJobsIterator_OneQueue(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo, nil) + it := NewQueuedJobsIterator(ctx, "A", repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -115,7 +115,7 @@ func TestQueuedJobsIterator_ExceedsBufferSize(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo, nil) + it := NewQueuedJobsIterator(ctx, "A", repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -137,7 +137,7 @@ func TestQueuedJobsIterator_ManyJobs(t *testing.T) { expected = append(expected, job.Id()) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo, nil) + it := NewQueuedJobsIterator(ctx, "A", repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -164,7 +164,7 @@ func TestCreateQueuedJobsIterator_TwoQueues(t *testing.T) { repo.Enqueue(job) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo, nil) + it := NewQueuedJobsIterator(ctx, "A", repo) actual := make([]string, 0) for { jctx, err := it.Next() @@ -187,7 +187,7 @@ func TestCreateQueuedJobsIterator_RespectsTimeout(t *testing.T) { ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), time.Millisecond) time.Sleep(20 * time.Millisecond) defer cancel() - it := NewQueuedJobsIterator(ctx, "A", repo, nil) + it := NewQueuedJobsIterator(ctx, "A", repo) job, err := it.Next() assert.Nil(t, job) assert.ErrorIs(t, err, context.DeadlineExceeded) @@ -205,7 +205,7 @@ func TestCreateQueuedJobsIterator_NilOnEmpty(t *testing.T) { repo.Enqueue(job) } ctx := armadacontext.Background() - it := NewQueuedJobsIterator(ctx, "A", repo, nil) + it := NewQueuedJobsIterator(ctx, "A", repo) for job, err := it.Next(); job != nil; job, err = it.Next() { require.NoError(t, err) } @@ -243,7 +243,7 @@ func (repo *mockJobRepository) Enqueue(job *jobdb.Job) { } func (repo *mockJobRepository) GetJobIterator(ctx *armadacontext.Context, queue string) JobIterator { - return NewQueuedJobsIterator(ctx, queue, repo, nil) + return NewQueuedJobsIterator(ctx, queue, repo) } func (repo *mockJobRepository) GetQueueJobIds(queue string) []string { diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index efe0ae3d394..aaf24244154 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -861,7 +861,7 @@ func (nodeDb *NodeDb) selectNodeForJobWithFairPreemption(txn *memdb.Txn, jctx *s nodeCopy := node.node.DeepCopyNilKeys() for _, job := range node.evictedJobs { // Remove preempted job from node - err = nodeDb.unbindJobFromNodeInPlace(nodeDb.priorityClasses, job.JobSchedulingContext.Job, nodeCopy) + err = nodeDb.unbindJobFromNodeInPlace(job.JobSchedulingContext.Job, nodeCopy) if err != nil { return nil, err } @@ -942,7 +942,6 @@ func (nodeDb *NodeDb) bindJobToNodeInPlace(node *internaltypes.Node, job *jobdb. // the jobs' priorities to evictedPriority; they are not subtracted from AllocatedByJobId and // AllocatedByQueue. func (nodeDb *NodeDb) EvictJobsFromNode( - priorityClasses map[string]types.PriorityClass, jobFilter func(*jobdb.Job) bool, jobs []*jobdb.Job, node *internaltypes.Node, @@ -954,7 +953,7 @@ func (nodeDb *NodeDb) EvictJobsFromNode( continue } evicted = append(evicted, job) - if err := nodeDb.evictJobFromNodeInPlace(priorityClasses, job, node); err != nil { + if err := nodeDb.evictJobFromNodeInPlace(job, node); err != nil { return nil, nil, err } } @@ -962,7 +961,7 @@ func (nodeDb *NodeDb) EvictJobsFromNode( } // evictJobFromNodeInPlace is the in-place operation backing EvictJobsFromNode. -func (nodeDb *NodeDb) evictJobFromNodeInPlace(priorityClasses map[string]types.PriorityClass, job *jobdb.Job, node *internaltypes.Node) error { +func (nodeDb *NodeDb) evictJobFromNodeInPlace(job *jobdb.Job, node *internaltypes.Node) error { jobId := job.Id() if _, ok := node.AllocatedByJobId[jobId]; !ok { return errors.Errorf("job %s has no resources allocated on node %s", jobId, node.GetId()) @@ -1010,10 +1009,10 @@ func markAllocatable(allocatableByPriority map[int32]internaltypes.ResourceList, } // UnbindJobsFromNode returns a node with all elements of jobs unbound from it. -func (nodeDb *NodeDb) UnbindJobsFromNode(priorityClasses map[string]types.PriorityClass, jobs []*jobdb.Job, node *internaltypes.Node) (*internaltypes.Node, error) { +func (nodeDb *NodeDb) UnbindJobsFromNode(jobs []*jobdb.Job, node *internaltypes.Node) (*internaltypes.Node, error) { node = node.DeepCopyNilKeys() for _, job := range jobs { - if err := nodeDb.unbindJobFromNodeInPlace(priorityClasses, job, node); err != nil { + if err := nodeDb.unbindJobFromNodeInPlace(job, node); err != nil { return nil, err } } @@ -1021,16 +1020,16 @@ func (nodeDb *NodeDb) UnbindJobsFromNode(priorityClasses map[string]types.Priori } // UnbindJobFromNode returns a copy of node with job unbound from it. -func (nodeDb *NodeDb) UnbindJobFromNode(priorityClasses map[string]types.PriorityClass, job *jobdb.Job, node *internaltypes.Node) (*internaltypes.Node, error) { +func (nodeDb *NodeDb) UnbindJobFromNode(job *jobdb.Job, node *internaltypes.Node) (*internaltypes.Node, error) { node = node.DeepCopyNilKeys() - if err := nodeDb.unbindJobFromNodeInPlace(priorityClasses, job, node); err != nil { + if err := nodeDb.unbindJobFromNodeInPlace(job, node); err != nil { return nil, err } return node, nil } // unbindPodFromNodeInPlace is like UnbindJobFromNode, but doesn't make a copy of node. -func (nodeDb *NodeDb) unbindJobFromNodeInPlace(priorityClasses map[string]types.PriorityClass, job *jobdb.Job, node *internaltypes.Node) error { +func (nodeDb *NodeDb) unbindJobFromNodeInPlace(job *jobdb.Job, node *internaltypes.Node) error { jobId := job.Id() requests := job.EfficientResourceRequirements() @@ -1178,7 +1177,7 @@ func (nodeDb *NodeDb) AddEvictedJobSchedulingContextWithTxn(txn *memdb.Txn, inde } func nodeDbSchema(priorities []int32, resources []string) (*memdb.DBSchema, map[int32]string, map[int32]int) { - nodesTable, indexNameByPriority, keyIndexByPriority := nodesTableSchema(priorities, resources) + nodesTable, indexNameByPriority, keyIndexByPriority := nodesTableSchema(priorities) evictionsTable := evictionsTableSchema() return &memdb.DBSchema{ Tables: map[string]*memdb.TableSchema{ @@ -1188,7 +1187,7 @@ func nodeDbSchema(priorities []int32, resources []string) (*memdb.DBSchema, map[ }, indexNameByPriority, keyIndexByPriority } -func nodesTableSchema(priorities []int32, resources []string) (*memdb.TableSchema, map[int32]string, map[int32]int) { +func nodesTableSchema(priorities []int32) (*memdb.TableSchema, map[int32]string, map[int32]int) { indexes := make(map[string]*memdb.IndexSchema, len(priorities)+1) indexes["id"] = &memdb.IndexSchema{ Name: "id", diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index dd07e25d07a..a42608d372a 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -72,7 +72,7 @@ func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) { db, err := newNodeDbWithNodes(nodes) require.NoError(t, err) jobs := testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1) - jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs) for _, jctx := range jctxs { txn := db.Txn(false) jctx.SetAssignedNodeId(nodeId) @@ -97,7 +97,7 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) { db, err := newNodeDbWithNodes(nodes) require.NoError(t, err) jobs := testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1) - jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs) for _, jctx := range jctxs { txn := db.Txn(false) jctx.SetAssignedNodeId("non-existent node") @@ -133,32 +133,32 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { boundNode, err := nodeDb.bindJobToNode(entry, job, job.PodRequirements().Priority) require.NoError(t, err) - unboundNode, err := nodeDb.UnbindJobFromNode(testfixtures.TestPriorityClasses, job, boundNode) + unboundNode, err := nodeDb.UnbindJobFromNode(job, boundNode) require.NoError(t, err) - unboundMultipleNode, err := nodeDb.UnbindJobsFromNode(testfixtures.TestPriorityClasses, []*jobdb.Job{job}, boundNode) + unboundMultipleNode, err := nodeDb.UnbindJobsFromNode([]*jobdb.Job{job}, boundNode) require.NoError(t, err) - evictedJobs, evictedNode, err := nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []*jobdb.Job{job}, boundNode) + evictedJobs, evictedNode, err := nodeDb.EvictJobsFromNode(jobFilter, []*jobdb.Job{job}, boundNode) require.NoError(t, err) assert.Equal(t, []*jobdb.Job{job}, evictedJobs) - evictedUnboundNode, err := nodeDb.UnbindJobFromNode(testfixtures.TestPriorityClasses, job, evictedNode) + evictedUnboundNode, err := nodeDb.UnbindJobFromNode(job, evictedNode) require.NoError(t, err) evictedBoundNode, err := nodeDb.bindJobToNode(evictedNode, job, job.PodRequirements().Priority) require.NoError(t, err) - _, _, err = nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []*jobdb.Job{job}, entry) + _, _, err = nodeDb.EvictJobsFromNode(jobFilter, []*jobdb.Job{job}, entry) require.Error(t, err) - _, err = nodeDb.UnbindJobFromNode(testfixtures.TestPriorityClasses, job, entry) + _, err = nodeDb.UnbindJobFromNode(job, entry) require.NoError(t, err) _, err = nodeDb.bindJobToNode(boundNode, job, job.PodRequirements().Priority) require.Error(t, err) - _, _, err = nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []*jobdb.Job{job}, evictedNode) + _, _, err = nodeDb.EvictJobsFromNode(jobFilter, []*jobdb.Job{job}, evictedNode) require.Error(t, err) assertNodeAccountingEqual(t, entry, unboundNode) @@ -293,7 +293,7 @@ func TestEviction(t *testing.T) { for i, job := range jobs { existingJobs[i] = job } - actualEvictions, _, err := nodeDb.EvictJobsFromNode(testfixtures.TestPriorityClasses, tc.jobFilter, existingJobs, entry) + actualEvictions, _, err := nodeDb.EvictJobsFromNode(tc.jobFilter, existingJobs, entry) require.NoError(t, err) expectedEvictions := make([]*jobdb.Job, 0, len(tc.expectedEvictions)) for _, i := range tc.expectedEvictions { @@ -434,7 +434,7 @@ func TestScheduleIndividually(t *testing.T) { nodeDb, err := newNodeDbWithNodes(tc.Nodes) require.NoError(t, err) - jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, tc.Jobs) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(tc.Jobs) for i, jctx := range jctxs { nodeDbTxn := nodeDb.Txn(true) @@ -524,7 +524,7 @@ func TestScheduleMany(t *testing.T) { require.NoError(t, err) for i, jobs := range tc.Jobs { nodeDbTxn := nodeDb.Txn(true) - jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs) gctx := schedulercontext.NewGangSchedulingContext(jctxs) ok, err := nodeDb.ScheduleManyWithTxn(nodeDbTxn, gctx) require.NoError(t, err) @@ -776,7 +776,7 @@ func benchmarkScheduleMany(b *testing.B, nodes []*schedulerobjects.Node, jobs [] b.ResetTimer() for n := 0; n < b.N; n++ { - jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(jobs) gctx := schedulercontext.NewGangSchedulingContext(jctxs) txn := nodeDb.Txn(true) _, err := nodeDb.ScheduleManyWithTxn(txn, gctx) diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 70981ac214d..8c386c5076b 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -14,7 +14,6 @@ import ( "github.com/armadaproject/armada/internal/common/armadacontext" armadamaps "github.com/armadaproject/armada/internal/common/maps" armadaslices "github.com/armadaproject/armada/internal/common/slices" - "github.com/armadaproject/armada/internal/common/types" schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/fairness" @@ -117,7 +116,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche NewNodeEvictor( sch.jobRepo, sch.nodeDb, - sch.schedulingContext.PriorityClasses, func(ctx *armadacontext.Context, job *jobdb.Job) bool { priorityClass := job.PriorityClass() if !priorityClass.Preemptible { @@ -180,7 +178,6 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche NewOversubscribedEvictor( sch.jobRepo, sch.nodeDb, - sch.schedulingContext.PriorityClasses, ), ) if err != nil { @@ -347,7 +344,6 @@ func (sch *PreemptingQueueScheduler) evictGangs(ctx *armadacontext.Context, txn evictor := NewFilteredEvictor( sch.jobRepo, sch.nodeDb, - sch.schedulingContext.PriorityClasses, gangNodeIds, gangJobIds, ) @@ -539,7 +535,7 @@ func (sch *PreemptingQueueScheduler) schedule(ctx *armadacontext.Context, inMemo if jobRepo == nil || reflect.ValueOf(jobRepo).IsNil() { jobIteratorByQueue[qctx.Queue] = evictedIt } else { - queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, jobRepo, sch.schedulingContext.PriorityClasses) + queueIt := NewQueuedJobsIterator(ctx, qctx.Queue, jobRepo) jobIteratorByQueue[qctx.Queue] = NewMultiJobsIterator(evictedIt, queueIt) } } @@ -588,7 +584,7 @@ func (sch *PreemptingQueueScheduler) unbindJobs(jctxs []*schedulercontext.JobSch if err != nil { return err } - node, err = sch.nodeDb.UnbindJobsFromNode(sch.schedulingContext.PriorityClasses, jobsOnNode, node) + node, err = sch.nodeDb.UnbindJobsFromNode(jobsOnNode, node) if err != nil { return err } @@ -693,11 +689,10 @@ func (sch *PreemptingQueueScheduler) assertions( } type Evictor struct { - jobRepo JobRepository - nodeDb *nodedb.NodeDb - priorityClasses map[string]types.PriorityClass - nodeFilter func(*armadacontext.Context, *internaltypes.Node) bool - jobFilter func(*armadacontext.Context, *jobdb.Job) bool + jobRepo JobRepository + nodeDb *nodedb.NodeDb + nodeFilter func(*armadacontext.Context, *internaltypes.Node) bool + jobFilter func(*armadacontext.Context, *jobdb.Job) bool } type EvictorResult struct { @@ -730,13 +725,11 @@ func (er *EvictorResult) SummaryString() string { func NewNodeEvictor( jobRepo JobRepository, nodeDb *nodedb.NodeDb, - priorityClasses map[string]types.PriorityClass, jobFilter func(*armadacontext.Context, *jobdb.Job) bool, ) *Evictor { return &Evictor{ - jobRepo: jobRepo, - nodeDb: nodeDb, - priorityClasses: priorityClasses, + jobRepo: jobRepo, + nodeDb: nodeDb, nodeFilter: func(_ *armadacontext.Context, node *internaltypes.Node) bool { return len(node.AllocatedByJobId) > 0 }, @@ -749,7 +742,6 @@ func NewNodeEvictor( func NewFilteredEvictor( jobRepo JobRepository, nodeDb *nodedb.NodeDb, - priorityClasses map[string]types.PriorityClass, nodeIdsToEvict map[string]bool, jobIdsToEvict map[string]bool, ) *Evictor { @@ -757,9 +749,8 @@ func NewFilteredEvictor( return nil } return &Evictor{ - jobRepo: jobRepo, - nodeDb: nodeDb, - priorityClasses: priorityClasses, + jobRepo: jobRepo, + nodeDb: nodeDb, nodeFilter: func(_ *armadacontext.Context, node *internaltypes.Node) bool { shouldEvict := nodeIdsToEvict[node.GetId()] return shouldEvict @@ -776,16 +767,14 @@ func NewFilteredEvictor( func NewOversubscribedEvictor( jobRepo JobRepository, nodeDb *nodedb.NodeDb, - priorityClasses map[string]types.PriorityClass, ) *Evictor { // Populating overSubscribedPriorities relies on // - nodeFilter being called once before all calls to jobFilter and // - jobFilter being called for all jobs on that node before moving on to another node. var overSubscribedPriorities map[int32]bool return &Evictor{ - jobRepo: jobRepo, - nodeDb: nodeDb, - priorityClasses: priorityClasses, + jobRepo: jobRepo, + nodeDb: nodeDb, nodeFilter: func(_ *armadacontext.Context, node *internaltypes.Node) bool { overSubscribedPriorities = make(map[int32]bool) for p, rl := range node.AllocatableByPriority { @@ -843,7 +832,7 @@ func (evi *Evictor) Evict(ctx *armadacontext.Context, nodeDbTxn *memdb.Txn) (*Ev } } jobs := evi.jobRepo.GetExistingJobsByIds(jobIds) - evictedJobs, node, err := evi.nodeDb.EvictJobsFromNode(evi.priorityClasses, jobFilter, jobs, node) + evictedJobs, node, err := evi.nodeDb.EvictJobsFromNode(jobFilter, jobs, node) if err != nil { return nil, err } diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index 57c4553a5a5..aff13ae00fb 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -57,8 +57,7 @@ func TestEvictOversubscribed(t *testing.T) { evictor := NewOversubscribedEvictor( NewSchedulerJobRepositoryAdapter(jobDbTxn), - nodeDb, - config.PriorityClasses) + nodeDb) result, err := evictor.Evict(armadacontext.Background(), nodeDbTxn) require.NoError(t, err) @@ -1797,7 +1796,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { nodeId := nodeIdByJobId[job.Id()] node, err := nodeDb.GetNode(nodeId) require.NoError(t, err) - node, err = nodeDb.UnbindJobFromNode(tc.SchedulingConfig.PriorityClasses, job, node) + node, err = nodeDb.UnbindJobFromNode(job, node) require.NoError(t, err) err = nodeDb.Upsert(node) require.NoError(t, err) @@ -1837,8 +1836,6 @@ func TestPreemptingQueueScheduler(t *testing.T) { require.NoError(t, err) sctx := schedulercontext.NewSchedulingContext( "pool", - tc.SchedulingConfig.PriorityClasses, - tc.SchedulingConfig.DefaultPriorityClassName, fairnessCostProvider, limiter, tc.TotalResources, @@ -2200,8 +2197,6 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { require.NoError(b, err) sctx := schedulercontext.NewSchedulingContext( "pool", - tc.SchedulingConfig.PriorityClasses, - tc.SchedulingConfig.DefaultPriorityClassName, fairnessCostProvider, limiter, nodeDb.TotalResources(), @@ -2267,8 +2262,6 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { for n := 0; n < b.N; n++ { sctx := schedulercontext.NewSchedulingContext( "pool", - tc.SchedulingConfig.PriorityClasses, - tc.SchedulingConfig.DefaultPriorityClassName, fairnessCostProvider, limiter, nodeDb.TotalResources(), diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 5fa64fbbb4d..b77e7f8d19f 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -501,10 +501,7 @@ func TestQueueScheduler(t *testing.T) { } jobRepo := NewInMemoryJobRepository() jobRepo.EnqueueMany( - schedulercontext.JobSchedulingContextsFromJobs( - tc.SchedulingConfig.PriorityClasses, - legacySchedulerJobs, - ), + schedulercontext.JobSchedulingContextsFromJobs(legacySchedulerJobs), ) fairnessCostProvider, err := fairness.NewDominantResourceFairness( @@ -514,8 +511,6 @@ func TestQueueScheduler(t *testing.T) { require.NoError(t, err) sctx := schedulercontext.NewSchedulingContext( "pool", - tc.SchedulingConfig.PriorityClasses, - tc.SchedulingConfig.DefaultPriorityClassName, fairnessCostProvider, rate.NewLimiter( rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate), diff --git a/internal/scheduler/scheduler_metrics_test.go b/internal/scheduler/scheduler_metrics_test.go index ae171ce52b6..f1ae54306e4 100644 --- a/internal/scheduler/scheduler_metrics_test.go +++ b/internal/scheduler/scheduler_metrics_test.go @@ -22,7 +22,7 @@ func TestAggregateJobs(t *testing.T) { testfixtures.Test1Cpu4GiJob("queue_a", testfixtures.PriorityClass0), } - actual := aggregateJobContexts(map[queuePriorityClassKey]int{}, schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, testJobs)) + actual := aggregateJobContexts(map[queuePriorityClassKey]int{}, schedulercontext.JobSchedulingContextsFromJobs(testJobs)) expected := map[queuePriorityClassKey]int{ {queue: "queue_a", priorityClass: testfixtures.PriorityClass0}: 4, diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index 5578d5bcfca..f0be5629d0c 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -1436,8 +1436,8 @@ func NewSchedulerResultForTest[S ~[]T, T *jobdb.Job]( nodeIdByJobId map[string]string, ) *SchedulerResult { return &SchedulerResult{ - PreemptedJobs: schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, preemptedJobs), - ScheduledJobs: schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, scheduledJobs), + PreemptedJobs: schedulercontext.JobSchedulingContextsFromJobs(preemptedJobs), + ScheduledJobs: schedulercontext.JobSchedulingContextsFromJobs(scheduledJobs), NodeIdByJobId: nodeIdByJobId, } } diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 9e96dce55df..d71e8fd3561 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -438,8 +438,6 @@ func (l *FairSchedulingAlgo) schedulePool( } sctx := schedulercontext.NewSchedulingContext( pool, - l.schedulingConfig.PriorityClasses, - l.schedulingConfig.DefaultPriorityClassName, fairnessCostProvider, l.limiter, totalResources, diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index ab74665fbd1..d2caab2248f 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -468,8 +468,6 @@ func (s *Simulator) handleScheduleEvent(ctx *armadacontext.Context) error { } sctx := schedulercontext.NewSchedulingContext( pool.Name, - s.schedulingConfig.PriorityClasses, - s.schedulingConfig.DefaultPriorityClassName, fairnessCostProvider, s.limiter, totalResources, @@ -850,7 +848,7 @@ func (s *Simulator) unbindRunningJob(job *jobdb.Job) error { } else if node == nil { return errors.Errorf("node %s not found", run.NodeId()) } - node, err = nodeDb.UnbindJobFromNode(s.schedulingConfig.PriorityClasses, job, node) + node, err = nodeDb.UnbindJobFromNode(job, node) if err != nil { return err } diff --git a/internal/scheduler/submitcheck.go b/internal/scheduler/submitcheck.go index f4100e53b2f..80f347d9adc 100644 --- a/internal/scheduler/submitcheck.go +++ b/internal/scheduler/submitcheck.go @@ -141,7 +141,7 @@ func (srv *SubmitChecker) Check(ctx *armadacontext.Context, jobs []*jobdb.Job) ( return nil, fmt.Errorf("executor state not loaded") } - jobContexts := schedulercontext.JobSchedulingContextsFromJobs(srv.schedulingConfig.PriorityClasses, jobs) + jobContexts := schedulercontext.JobSchedulingContextsFromJobs(jobs) results := make(map[string]schedulingResult, len(jobs)) // First, check if all jobs can be scheduled individually.