Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor scheduler's framework permit API #83756

Merged
merged 1 commit into from
Oct 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 17 additions & 23 deletions pkg/scheduler/framework/v1alpha1/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,21 +542,22 @@ func (f *framework) RunPermitPlugins(
ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) {
startTime := time.Now()
defer func() { recordExtensionPointDuration(startTime, permit, status) }()
timeout := maxTimeout
pluginsWaitTime := make(map[string]time.Duration)
statusCode := Success
for _, pl := range f.permitPlugins {
status, d := pl.Permit(ctx, state, pod, nodeName)
status, timeout := pl.Permit(ctx, state, pod, nodeName)
if !status.IsSuccess() {
if status.IsUnschedulable() {
msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message())
klog.V(4).Infof(msg)
return NewStatus(status.Code(), msg)
}
if status.Code() == Wait {
// Use the minimum timeout duration.
if timeout > d {
timeout = d
// Not allowed to be greater than maxTimeout.
if timeout > maxTimeout {
timeout = maxTimeout
}
pluginsWaitTime[pl.Name()] = timeout
statusCode = Wait
} else {
msg := fmt.Sprintf("error while running %q permit plugin for pod %q: %v", pl.Name(), pod.Name, status.Message())
Expand All @@ -569,27 +570,20 @@ func (f *framework) RunPermitPlugins(
// We now wait for the minimum duration if at least one plugin asked to
// wait (and no plugin rejected the pod)
if statusCode == Wait {
w := newWaitingPod(pod)
w := newWaitingPod(pod, pluginsWaitTime)
f.waitingPods.add(w)
defer f.waitingPods.remove(pod.UID)
timer := time.NewTimer(timeout)
klog.V(4).Infof("waiting for %v for pod %q at permit", timeout, pod.Name)
select {
case <-timer.C:
msg := fmt.Sprintf("pod %q rejected due to timeout after waiting %v at permit", pod.Name, timeout)
klog.V(4).Infof(msg)
return NewStatus(Unschedulable, msg)
case s := <-w.s:
if !s.IsSuccess() {
if s.IsUnschedulable() {
msg := fmt.Sprintf("rejected while waiting at permit: %v", s.Message())
klog.V(4).Infof(msg)
return NewStatus(s.Code(), msg)
}
msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message())
klog.Error(msg)
return NewStatus(Error, msg)
klog.V(4).Infof("waiting for pod %q at permit", pod.Name)
s := <-w.s
if !s.IsSuccess() {
if s.IsUnschedulable() {
msg := fmt.Sprintf("pod %q rejected while waiting at permit: %v", pod.Name, s.Message())
klog.V(4).Infof(msg)
return NewStatus(s.Code(), msg)
}
msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message())
klog.Error(msg)
return NewStatus(Error, msg)
}
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/scheduler/framework/v1alpha1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,14 @@ func NewStatus(code Code, msg string) *Status {
type WaitingPod interface {
// GetPod returns a reference to the waiting pod.
GetPod() *v1.Pod
// Allow the waiting pod to be scheduled. Returns true if the allow signal was
// successfully delivered, false otherwise.
Allow() bool
// Reject declares the waiting pod unschedulable. Returns true if the allow signal
// GetPendingPlugins returns a list of pending permit plugin's name.
GetPendingPlugins() []string
// Allow declares the waiting pod is allowed to be scheduled by plugin pluginName.
// If this is the last remaining plugin to allow, then a success signal is delivered
// to unblock the pod.
// Returns true if the allow signal was successfully dealt with, false otherwise.
hex108 marked this conversation as resolved.
Show resolved Hide resolved
Allow(pluginName string) bool
// Reject declares the waiting pod unschedulable. Returns true if the reject signal
// was successfully delivered, false otherwise.
Reject(msg string) bool
}
Expand Down
64 changes: 56 additions & 8 deletions pkg/scheduler/framework/v1alpha1/waiting_pods_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package v1alpha1

import (
"fmt"
"sync"
"time"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -69,26 +71,66 @@ func (m *waitingPodsMap) iterate(callback func(WaitingPod)) {

// waitingPod represents a pod waiting in the permit phase.
type waitingPod struct {
pod *v1.Pod
s chan *Status
pod *v1.Pod
pendingPlugins map[string]*time.Timer
s chan *Status
mu sync.RWMutex
}

// newWaitingPod returns a new waitingPod instance.
func newWaitingPod(pod *v1.Pod) *waitingPod {
return &waitingPod{
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
wp := &waitingPod{
pod: pod,
s: make(chan *Status),
}

wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime))
for k, v := range pluginsMaxWaitTime {
plugin, waitTime := k, v
wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
hex108 marked this conversation as resolved.
Show resolved Hide resolved
waitTime, plugin)
wp.Reject(msg)
})
}

return wp
}

// GetPod returns a reference to the waiting pod.
func (w *waitingPod) GetPod() *v1.Pod {
return w.pod
}

// Allow the waiting pod to be scheduled. Returns true if the allow signal was
// successfully delivered, false otherwise.
func (w *waitingPod) Allow() bool {
// GetPendingPlugins returns a list of pending permit plugin's name.
func (w *waitingPod) GetPendingPlugins() []string {
w.mu.RLock()
defer w.mu.RUnlock()
plugins := make([]string, 0, len(w.pendingPlugins))
for p := range w.pendingPlugins {
plugins = append(plugins, p)
}

return plugins
}

// Allow declares the waiting pod is allowed to be scheduled by plugin pluginName.
// If this is the last remaining plugin to allow, then a success signal is delivered
// to unblock the pod.
// Returns true if the allow signal was successfully dealt with, false otherwise.
func (w *waitingPod) Allow(pluginName string) bool {
w.mu.Lock()
defer w.mu.Unlock()
if timer, exist := w.pendingPlugins[pluginName]; exist {
timer.Stop()
delete(w.pendingPlugins, pluginName)
hex108 marked this conversation as resolved.
Show resolved Hide resolved
}

// Only signal success status after all plugins have allowed
if len(w.pendingPlugins) != 0 {
hex108 marked this conversation as resolved.
Show resolved Hide resolved
return true
}

select {
case w.s <- NewStatus(Success, ""):
return true
Expand All @@ -97,9 +139,15 @@ func (w *waitingPod) Allow() bool {
}
}

// Reject declares the waiting pod unschedulable. Returns true if the allow signal
// Reject declares the waiting pod unschedulable. Returns true if the reject signal
// was successfully delivered, false otherwise.
func (w *waitingPod) Reject(msg string) bool {
w.mu.RLock()
defer w.mu.RUnlock()
for _, timer := range w.pendingPlugins {
timer.Stop()
}

select {
case w.s <- NewStatus(Unschedulable, msg):
return true
Expand Down
Loading