Skip to content
This repository has been archived by the owner on Feb 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #1578 from aluzzardi/rescheduling
Browse files Browse the repository at this point in the history
[experimental] Simple container rescheduling on node failure
  • Loading branch information
abronan committed Jan 12, 2016
2 parents 4a58289 + 74dfe8b commit e121338
Show file tree
Hide file tree
Showing 13 changed files with 490 additions and 35 deletions.
8 changes: 7 additions & 1 deletion api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,14 @@ func postContainersCreate(c *context, w http.ResponseWriter, r *http.Request) {
authConfig = &dockerclient.AuthConfig{}
json.Unmarshal(buf, authConfig)
}
containerConfig := cluster.BuildContainerConfig(config)

container, err := c.cluster.CreateContainer(cluster.BuildContainerConfig(config), name, authConfig)
if err := containerConfig.Validate(); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}

container, err := c.cluster.CreateContainer(containerConfig, name, authConfig)
if err != nil {
if strings.HasPrefix(err.Error(), "Conflict") {
httpError(w, err.Error(), http.StatusConflict)
Expand Down
1 change: 1 addition & 0 deletions cli/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,5 +321,6 @@ func manage(c *cli.Context) {
server.SetHandler(api.NewPrimary(cl, tlsConfig, &statusHandler{cl, nil, nil}, c.GlobalBool("debug"), c.Bool("cors")))
}

cluster.NewWatchdog(cl)
log.Fatal(server.ListenAndServe())
}
3 changes: 3 additions & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type Cluster interface {
// Register an event handler for cluster-wide events.
RegisterEventHandler(h EventHandler) error

// Unregister an event handler.
UnregisterEventHandler(h EventHandler)

// FIXME: remove this method
// Return a random engine
RANDOMENGINE() (*Engine, error)
Expand Down
55 changes: 51 additions & 4 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cluster

import (
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/samalba/dockerclient"
Expand Down Expand Up @@ -63,9 +65,10 @@ func consolidateResourceFields(c *dockerclient.ContainerConfig) {
// BuildContainerConfig creates a cluster.ContainerConfig from a dockerclient.ContainerConfig
func BuildContainerConfig(c dockerclient.ContainerConfig) *ContainerConfig {
var (
affinities []string
constraints []string
env []string
affinities []string
constraints []string
reschedulePolicies []string
env []string
)

// only for tests
Expand All @@ -83,12 +86,19 @@ func BuildContainerConfig(c dockerclient.ContainerConfig) *ContainerConfig {
json.Unmarshal([]byte(labels), &constraints)
}

// parse affinities/constraints from env (ex. docker run -e affinity:container==redis -e affinity:image==nginx -e constraint:region==us-east -e constraint:storage==ssd)
// parse reschedule policy from labels (ex. docker run --label 'com.docker.swarm.reschedule-policies=on-node-failure')
if labels, ok := c.Labels[SwarmLabelNamespace+".reschedule-policies"]; ok {
json.Unmarshal([]byte(labels), &reschedulePolicies)
}

// parse affinities/constraints/reschedule policies from env (ex. docker run -e affinity:container==redis -e affinity:image==nginx -e constraint:region==us-east -e constraint:storage==ssd -e reschedule:off)
for _, e := range c.Env {
if ok, key, value := parseEnv(e); ok && key == "affinity" {
affinities = append(affinities, value)
} else if ok && key == "constraint" {
constraints = append(constraints, value)
} else if ok && key == "reschedule" {
reschedulePolicies = append(reschedulePolicies, value)
} else {
env = append(env, e)
}
Expand All @@ -111,6 +121,13 @@ func BuildContainerConfig(c dockerclient.ContainerConfig) *ContainerConfig {
}
}

// store reschedule policies in labels
if len(reschedulePolicies) > 0 {
if labels, err := json.Marshal(reschedulePolicies); err == nil {
c.Labels[SwarmLabelNamespace+".reschedule-policies"] = string(labels)
}
}

consolidateResourceFields(&c)

return &ContainerConfig{c}
Expand Down Expand Up @@ -186,3 +203,33 @@ func (c *ContainerConfig) HaveNodeConstraint() bool {
}
return false
}

// HasReschedulePolicy returns true if the specified policy is part of the config
func (c *ContainerConfig) HasReschedulePolicy(p string) bool {
for _, reschedulePolicy := range c.extractExprs("reschedule-policies") {
if reschedulePolicy == p {
return true
}
}
return false
}

// Validate returns an error if the config isn't valid
func (c *ContainerConfig) Validate() error {
//TODO: add validation for affinities and constraints
reschedulePolicies := c.extractExprs("reschedule-policies")
if len(reschedulePolicies) > 1 {
return errors.New("too many reschedule policies")
} else if len(reschedulePolicies) == 1 {
valid := false
for _, validReschedulePolicy := range []string{"off", "on-node-failure"} {
if reschedulePolicies[0] == validReschedulePolicy {
valid = true
}
}
if !valid {
return fmt.Errorf("invalid reschedule policy: %s", reschedulePolicies[0])
}
}
return nil
}
5 changes: 5 additions & 0 deletions cluster/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ func (c *Container) Refresh() (*Container, error) {
return c.Engine.refreshContainer(c.Id, true)
}

// Start a container
func (c *Container) Start() error {
return c.Engine.client.StartContainer(c.Id, nil)
}

// Containers represents a list a containers
type Containers []*Container

Expand Down
54 changes: 53 additions & 1 deletion cluster/event.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package cluster

import "github.com/samalba/dockerclient"
import (
"errors"
"sync"

log "github.com/Sirupsen/logrus"
"github.com/samalba/dockerclient"
)

// Event is exported
type Event struct {
Expand All @@ -12,3 +18,49 @@ type Event struct {
type EventHandler interface {
Handle(*Event) error
}

// EventHandlers is a map of EventHandler
type EventHandlers struct {
sync.RWMutex

eventHandlers map[EventHandler]struct{}
}

// NewEventHandlers returns a EventHandlers
func NewEventHandlers() *EventHandlers {
return &EventHandlers{
eventHandlers: make(map[EventHandler]struct{}),
}
}

// Handle callbacks for the events
func (eh *EventHandlers) Handle(e *Event) {
eh.RLock()
defer eh.RUnlock()

for h := range eh.eventHandlers {
if err := h.Handle(e); err != nil {
log.Error(err)
}
}
}

// RegisterEventHandler registers an event handler.
func (eh *EventHandlers) RegisterEventHandler(h EventHandler) error {
eh.Lock()
defer eh.Unlock()

if _, ok := eh.eventHandlers[h]; ok {
return errors.New("event handler already set")
}
eh.eventHandlers[h] = struct{}{}
return nil
}

// UnregisterEventHandler unregisters a previously registered event handler.
func (eh *EventHandlers) UnregisterEventHandler(h EventHandler) {
eh.Lock()
defer eh.Unlock()

delete(eh.eventHandlers, h)
}
21 changes: 9 additions & 12 deletions cluster/mesos/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Cluster struct {

driver *mesosscheduler.MesosSchedulerDriver
dockerEnginePort string
eventHandler cluster.EventHandler
eventHandlers *cluster.EventHandlers
master string
agents map[string]*agent
scheduler *scheduler.Scheduler
Expand Down Expand Up @@ -67,6 +67,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
}
cluster := &Cluster{
dockerEnginePort: defaultDockerEnginePort,
eventHandlers: cluster.NewEventHandlers(),
master: master,
agents: make(map[string]*agent),
scheduler: scheduler,
Expand Down Expand Up @@ -156,22 +157,18 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st

// Handle callbacks for the events
func (c *Cluster) Handle(e *cluster.Event) error {
if c.eventHandler == nil {
return nil
}
if err := c.eventHandler.Handle(e); err != nil {
log.Error(err)
}
c.eventHandlers.Handle(e)
return nil
}

// RegisterEventHandler registers an event handler.
func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error {
if c.eventHandler != nil {
return errors.New("event handler already set")
}
c.eventHandler = h
return nil
return c.eventHandlers.RegisterEventHandler(h)
}

// UnregisterEventHandler unregisters a previously registered event handler.
func (c *Cluster) UnregisterEventHandler(h cluster.EventHandler) {
c.eventHandlers.UnregisterEventHandler(h)
}

// CreateContainer for container creation in Mesos task
Expand Down
30 changes: 15 additions & 15 deletions cluster/swarm/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (p *pendingContainer) ToContainer() *cluster.Container {
type Cluster struct {
sync.RWMutex

eventHandler cluster.EventHandler
eventHandlers *cluster.EventHandlers
engines map[string]*cluster.Engine
pendingEngines map[string]*cluster.Engine
scheduler *scheduler.Scheduler
Expand All @@ -67,6 +67,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")

cluster := &Cluster{
eventHandlers: cluster.NewEventHandlers(),
engines: make(map[string]*cluster.Engine),
pendingEngines: make(map[string]*cluster.Engine),
scheduler: scheduler,
Expand All @@ -90,22 +91,18 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, discovery

// Handle callbacks for the events
func (c *Cluster) Handle(e *cluster.Event) error {
if c.eventHandler == nil {
return nil
}
if err := c.eventHandler.Handle(e); err != nil {
log.Error(err)
}
c.eventHandlers.Handle(e)
return nil
}

// RegisterEventHandler registers an event handler.
func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error {
if c.eventHandler != nil {
return errors.New("event handler already set")
}
c.eventHandler = h
return nil
return c.eventHandlers.RegisterEventHandler(h)
}

// UnregisterEventHandler unregisters a previously registered event handler.
func (c *Cluster) UnregisterEventHandler(h cluster.EventHandler) {
c.eventHandlers.UnregisterEventHandler(h)
}

// Generate a globally (across the cluster) unique ID.
Expand Down Expand Up @@ -145,9 +142,12 @@ func (c *Cluster) createContainer(config *cluster.ContainerConfig, name string,
return nil, fmt.Errorf("Conflict: The name %s is already assigned. You have to delete (or rename) that container to be able to assign %s to a container again.", name, name)
}

// Associate a Swarm ID to the container we are creating.
swarmID := c.generateUniqueID()
config.SetSwarmID(swarmID)
swarmID := config.SwarmID()
if swarmID == "" {
// Associate a Swarm ID to the container we are creating.
swarmID = c.generateUniqueID()
config.SetSwarmID(swarmID)
}

if withImageAffinity {
config.AddAffinity("image==" + config.Image)
Expand Down
Loading

0 comments on commit e121338

Please sign in to comment.