Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CFS design #468

Merged
merged 3 commits into from
Nov 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
177 changes: 177 additions & 0 deletions doc/design/scheduler.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# Scheduler for TrainingJob

## Background

We are going to define PaddlePaddle cluster job as a Kubernetes
[TPR](https://kubernetes.io/docs/tasks/access-kubernetes-api/extend-api-third-party-resource/) or
[CRD](https://kubernetes.io/docs/concepts/api-extension/custom-resources/).
Each job is described using a `yaml` representation called
[TrainingJob](../autoscale/README.md). When a `TrainingJob` resource is
submitted to Kubernetes cluster, our customized `controller` program will
receive an event informing the resource creation/deletion.

The `controller` program should contain the following core functions:

- Parser to parse `TrainingJob` resource to corresponding job components,
including:
- `ReplicaSet` of master process
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using ReplicaSet / StatefulSet / Job means we will depend on Kubernetes' scheduler for scheduling Pods. Since we are creating our own scheduler, should we rely on Kubernetes' scheduler or not? What is the pros and cons of both cases?

Copy link
Collaborator Author

@typhoonzero typhoonzero Nov 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply.
Yes, you are right. Not using default k8s scheduler will let us have more control over TraningJobs see here. The scheduler is in charge of putting pods on nodes.

Pros:

  • Can add weight to resource types like GPU when scheduling pods.
  • Scaling and scheduling can be in the same process.
  • New resource types can be taken care of like FPGA.

Cons:

  • The core function of schedule pods to nodes seems to be same for the TrainginJob scheduler. Resource request per node won't be changed, we only change the number of pods to run, which is already done by autoscaler.
  • Hard to implement, we have to implement leader selection for "High Availability".

I think using default-scheduler for the pod->nodes job is enough currently, we only need to queue TrainingJobs by priority and hand them to k8s, for now.

Copy link
Collaborator

@helinwang helinwang Nov 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero I see, thanks! Agree that using the default scheduler is better for our current use case.

Another related question: do we need to use k8s Job / StateSet, another possibility is we can submit the creation and deletion of Pods directly (but still using the default scheduler).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another related question: do we need to use k8s Job / StateSet, another possibility is we can submit the creation and deletion of Pods directly (but still using the default scheduler).

This is possible and may be useful. Instead, the controller has to track all pods' status, which is implemented in the k8s Job/StateSet controller.

Pros for directly control pods:

  • control scale by pod status, i.e. scale up/down the slowest pod (pod runs the smallest batch-id)
  • dynamically change resource request of pods.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@helinwang @Yancey1989 what you think of "submit the creation and deletion of Pods directly", I'll update the doc if we all agree with this.

Copy link
Collaborator

@Yancey1989 Yancey1989 Nov 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another possibility is we can submit the creation and deletion of Pods directly (but still using the default scheduler)

Maybe we also need a custom scheduler, because only creation and deletion of Pods also produce Pending Pod, and the default-scheduler use a FIFO queue to scheduler the Pod, we can not dynamic adjust the priorities for all the pending Pods.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gongweibao : please take a look at this discussion, it's related to your converting Python start job to Go controller.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK!

- `ReplicaSet` or `StatefulSet` for etcd cluster
- `ReplicaSet` of `pserver` process
- `Job` of `trainer` process
- Queue to sort `TrainingJob` resource for schedule
- Scheduler to determine which job to run or to scale by:
- Job static priority
- Job resource request (GPU > CPU > Memory)
- Job total running time of every pod
- Cluster free resource
- Autoscaler to dynamically tunning job resources.

Cases that need to be considered during the implementation:

1. GPU is much more expensive than CPUs, jobs require GPU resource should
have higher priority to run on GPU machines than CPU only jobs. Also a
`TrainingJob` requires GPU must require enough CPU resource for it so that
CPU used for launch CUDA kernels and memory copies is not blocking the
performance of GPU accelerated training jobs.
1. Jobs have priorities. Some offline jobs have the higher priority that can be
able to acquire enough resource so that the job can complete at the desired time,
then job result can be updated to the production service. Other jobs like an experiment and one-shot jobs have lower priority, they can be scaled up when the cluster is free and can be scaled down when the cluster is busy.
1. Otherwise, jobs should share the cluster resource fairly, which means, if
a job is waiting enough long, it can finally be scheduled to the cluster, no
matter it may have very low priority (except that the cluster is full of
production service).
1. A cluster may run both online service and offline batch jobs. The online
Copy link
Collaborator

@gongweibao gongweibao Nov 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我的理解:我们可以按照任务的性质分成

  • 在线
  • 离线
  • 团队实验
  • 个人实验

几种层别,这样用户可能对Job priority有更直观的理解。一定程度避免现在大家都把自己任务调高优先级别的情况出现。

高层别的任务优先级别一定大于低层别的优先级别,在同一个层别里边才有更细的任务优先级别的排序。

提交任务的时候,可以让用户同时提交Job nature Job priority两个参数。

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

优先级在下面 Interface章节已经介绍了。

services have high priority and are not interruptible. But `TrainingJobs` can
re-use the cluster resource when the online service came to the certain time of
day that is not that active.
1. About quota, users quota should be considered so that scheduled job is not
exceeding it.

## Scheduler design

Here we define the core scheduler interfaces and algorithms.

### Interface

Scheduler deals with atomic scheduling unit named `Unit`. The `TraniningJob`
resource is the member of `Unit`, we can get it by calling `unit.Obj()`.

```go
type PrioLevel int

const (
Experiement PrioLevel = 10
Offline = 100
Normal = 1000
Production = 10000
)

type Unit interface {
// GetPrio returns the current priority level.
GetPrio() PrioLevel
// SetPrio set the unit priority level directly.
SetPrio(prio PrioLevel)

// MaxInstances returns the desired max parallelism of the job.
MaxInstances() int
// MinInstances returns the minimal parallelism the job can be running.
MinInstances() int
// ResourceScore returns resource score of a single pod. It's
// caculated by sum(weight*ResourceValue).
ResourceScore() int64

// Expected returns expected parallelism (how much pods) to run for
// current scheduling step.
ExpectedCount() int64
// Running returns the current parrallelism of the unit.
// If Running == 0 means the job is waiting for resources.
RunningCount() int64

// Obj returns inner scheduling unit.
Obj() interface{}
}
```

Currently, we only support 4 levels of priority. Note that the priority is not
continuous, so that we can extend more levels later.

Then we define the scheduler interface:

```go
type GpuPriorityCFS interface {
// AddUnit insert a new Unit object to the scheduler.
AddUnit(unit *Unit) error
// DelUnit remove the completed unit from scheduler.
DelUnit(unit *Unit) error
// GetLeftMost return the smallest valued unit in the scheduler's tree.
GetLeftMost() *Unit
// GetRightMost return the maximum valued unit in the scheduler's tree.
GetRightMost() *Unit
// Len return number of units in the scheduler.
Len() int

// Traverse go thought every unit in the scheduler.
Tranverse(callback ...func(*Unit)) error
}
```

### Scheduling algorithm

We use an implementation similar to
[CFS](https://en.wikipedia.org/wiki/Completely_Fair_Scheduler) as the
default scheduler for `TrainingJobs`. Other jobs or services submitted using
`kubectl` will not be controlled by this scheduler, but the resource
consumption will be considered.

Scheduler stores all units in a red-black tree, sorted by score
`GetPrio() * ResourceScore() * sum(RunningCount() * pod.RunningTime())`
(weighted total running time). In order to make the jobs
"fair", `Unit`'s `ExpectedCount()` is calculated traversing every unit by order,
and increase/decrease one by one in each "dry run", try to make the score
even across cluster:

1. The left most child is selected. This is the job that spent least running
time on the cluster.
2. If the job is not running yet (newly submitted job), try to increase job
parallelism to the initial parallelism value. If the resource is sufficient,
parse the `TrainingJob` and create the job instance, if the resource is not
sufficient, go to step 1.
3. If the job is already running try to scale it up by 1 to use more free
resources.
4. If the job has completed, stop the pserver and master, then remove it from
the tree.
5. Go to step 1 to run another step until all the units are traversed.
6. Accumulate the diff for each unit.
7. If above steps get no diff for every job, then use the same strategy to scale
down some jobs to achieve fairness, call `GetRightMost()` to get right most
unit, and try scale down jobs if the score is far away from even.

- NOTE: we should make scale up/down operations less frequently, because
cluster job is not like processes, frequent interruptting may cause significant
job performance issue.

### Queues

We **don't** put jobs into several queues, like "RUNNING", "TODO", "DONE". Only
**one** queue is used for indexing jobs. Jobs that have not started, consumes
no resource.

### Scheduling Intervals And Freezing Window

Scheduling operations are triggered:

- Every 5 seconds
- Or by controller event: adding, updating, deleting Of `TrainingJob`

Speaking of "fair", if we do scaling operations very fast, every jobs' trainer
the count will be constantly in flux, and that's what we don't want. We introduce
configurable `FreezingWindow` for every `TrainingJob`, in that time window,
the job should not take any scaling operations to minimize the cost introduced
by scaling the job.

## References

- https://en.wikipedia.org/wiki/Completely_Fair_Scheduler
- https://kubernetes.io/docs/tasks/access-kubernetes-api/extend-api-third-party-resource/#what-is-thirdpartyresource
- https://kubernetes.io/docs/tasks/access-kubernetes-api/extend-api-custom-resource-definitions/
- https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/
56 changes: 46 additions & 10 deletions go/cfs/cfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,60 @@ package cfs
// "github.com/sakeven/RbTree"
// )

// Node represent minimum cell to schedule.
// PrioLevel is enum type indicating priority levels.
type PrioLevel int

const (
// Experiement priority level
Experiement PrioLevel = 0
// Offline job
Offline = 3
// Normal jobs
Normal = 7
// Production level job
Production = 11
)

// Node is the atimic schedule unit for the scheduler.
type Node interface {
// the mixed weight of current node.
Weight() float64
// GetPrio returns the current priority level.
GetPrio() PrioLevel
// SetPrio set the node priority level directly.
SetPrio(prio PrioLevel)

// MaxInstances returns the desired max parallelism of the job.
MaxInstances() int
// MinInstances returns the minimal parallelism the job can be running.
MinInstances() int
// ResourceScore returns resource score of a single pod. It's
// caculated by sum(weight*ResourceValue).
ResourceScore() int64

// get the object to be scheduled.
// Expected returns expected parallelism (how much pods) to run for
// current scheduling step.
Expected() int64
// Running returns the current parrallelism of the node.
// If Running == 0 means the job is waiting for resources.
Running() int64

// Obj returns inner scheduling unit.
Obj() *interface{}
}

// WeightedAccelleratorCFS is a scheduler to schedule jobs/processes to use
// GpuPriorityCFS is a scheduler to schedule jobs/processes to use
// multiple kind of processers, like both CPU and GPU, or mix with FPGA etc.
type WeightedAccelleratorCFS interface {
type GpuPriorityCFS interface {
// AddNode insert new node to the scheduler.
AddNode(node *Node) error
// DelNode remove the completed node from scheduler.
DelNode(node *Node) error
// GetLeftMost return the smallest valued node in the scheduler's tree.
GetLeftMost() *Node
// GetRightMost return the maximum valued node in the scheduler's tree.
GetRightMost() *Node
// Len return number of nodes in the scheduler.
Len() int

// tranverse all the nodes and call the callback function.
// Tranverse go thought every nodes in the scheduler.
Tranverse(callback ...func(*Node)) error

// Get one node need to schedule, which have waited long enough.
Get() *Node
}