Skip to content

Commit

Permalink
constraintenforcer: Trigger task restarts when appropriate
Browse files Browse the repository at this point in the history
The constraint enforcer currently sets task desired state to "shutdown"
directly, which means the orchestrator will consider these tasks already
processed, and won't trigger restarts. While this is appropriate for
global services, replicated services should keep the desired number of
replicas running, so each task shut down by the constraint enforcer
should be restarted somewhere else.

This changes the constraint enforcer to trigger a task shutdown by
updating the actual state rather than desired state. This will cause the
orchestrator to restart the task when necessary. It's not a perfect
solution, because it bends rules about field ownership, and may cause
the replacement task to start before the old one stops. However, it's a
good compromise solution for this problem that doesn't require absorbing
the constraint enforcer into each orchestrator (which wouldn't fit the
model well), or adding a third type of state to every task.

Also, update the global orchestrator to only restart a task when the
node still meets the constraints.

Signed-off-by: Aaron Lehmann <[email protected]>
  • Loading branch information
aaronlehmann committed Feb 15, 2017
1 parent 03d5b15 commit 69fcc19
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 8 deletions.
20 changes: 16 additions & 4 deletions manager/orchestrator/constraintenforcer/constraint_enforcer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package constraintenforcer

import (
"time"

"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/constraint"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/protobuf/ptypes"
)

// ConstraintEnforcer watches for updates to nodes and shuts down tasks that no
Expand Down Expand Up @@ -43,22 +46,22 @@ func (ce *ConstraintEnforcer) Run() {
log.L.WithError(err).Error("failed to check nodes for noncompliant tasks")
} else {
for _, node := range nodes {
ce.shutdownNoncompliantTasks(node)
ce.rejectNoncompliantTasks(node)
}
}

for {
select {
case event := <-watcher:
node := event.(state.EventUpdateNode).Node
ce.shutdownNoncompliantTasks(node)
ce.rejectNoncompliantTasks(node)
case <-ce.stopChan:
return
}
}
}

func (ce *ConstraintEnforcer) shutdownNoncompliantTasks(node *api.Node) {
func (ce *ConstraintEnforcer) rejectNoncompliantTasks(node *api.Node) {
// If the availability is "drain", the orchestrator will
// shut down all tasks.
// If the availability is "pause", we shouldn't touch
Expand Down Expand Up @@ -134,7 +137,16 @@ func (ce *ConstraintEnforcer) shutdownNoncompliantTasks(node *api.Node) {
return nil
}

t.DesiredState = api.TaskStateShutdown
// We set the observed state to
// REJECTED, rather than the desired
// state. Desired state is owned by the
// orchestrator, and setting it directly
// will bypass actions such as
// restarting the task on another node
// (if applicable).
t.Status.State = api.TaskStateRejected
t.Status.Message = "assigned node no longer meets constraints"
t.Status.Timestamp = ptypes.MustTimestampProto(time.Now())
return store.UpdateTask(tx, t)
})
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,10 @@ func TestConstraintEnforcer(t *testing.T) {

go constraintEnforcer.Run()

// id0 should be killed immediately
shutdown1 := testutils.WatchShutdownTask(t, watch)
// id0 should be rejected immediately
shutdown1 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, "id0", shutdown1.ID)
assert.Equal(t, api.TaskStateRejected, shutdown1.Status.State)

// Change node id1 to a manager
err = s.Update(func(tx store.Tx) error {
Expand All @@ -147,8 +148,9 @@ func TestConstraintEnforcer(t *testing.T) {
})
assert.NoError(t, err)

shutdown2 := testutils.WatchShutdownTask(t, watch)
shutdown2 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, "id2", shutdown2.ID)
assert.Equal(t, api.TaskStateRejected, shutdown2.Status.State)

// Change resources on node id2
err = s.Update(func(tx store.Tx) error {
Expand All @@ -162,6 +164,7 @@ func TestConstraintEnforcer(t *testing.T) {
})
assert.NoError(t, err)

shutdown3 := testutils.WatchShutdownTask(t, watch)
shutdown3 := testutils.WatchTaskUpdate(t, watch)
assert.Equal(t, "id4", shutdown3.ID)
assert.Equal(t, api.TaskStateRejected, shutdown3.Status.State)
}
12 changes: 12 additions & 0 deletions manager/orchestrator/global/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,22 @@ func (g *Orchestrator) tickTasks(ctx context.Context) {
if t == nil || t.DesiredState > api.TaskStateRunning {
return nil
}

service := store.GetService(tx, t.ServiceID)
if service == nil {
return nil
}

node, nodeExists := g.nodes[t.NodeID]
serviceEntry, serviceExists := g.globalServices[t.ServiceID]
if !nodeExists || !serviceExists {
return nil
}
if !constraint.NodeMatches(serviceEntry.constraints, node) {
t.DesiredState = api.TaskStateShutdown
return store.UpdateTask(tx, t)
}

return g.restarts.Restart(ctx, tx, g.cluster, service, *t)
})
if err != nil {
Expand Down

0 comments on commit 69fcc19

Please sign in to comment.