diff --git a/manager/orchestrator/constraintenforcer/constraint_enforcer.go b/manager/orchestrator/constraintenforcer/constraint_enforcer.go index 1a84b05d76..f38e87b710 100644 --- a/manager/orchestrator/constraintenforcer/constraint_enforcer.go +++ b/manager/orchestrator/constraintenforcer/constraint_enforcer.go @@ -1,11 +1,14 @@ package constraintenforcer import ( + "time" + "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager/constraint" "github.com/docker/swarmkit/manager/state" "github.com/docker/swarmkit/manager/state/store" + "github.com/docker/swarmkit/protobuf/ptypes" ) // ConstraintEnforcer watches for updates to nodes and shuts down tasks that no @@ -134,7 +137,16 @@ func (ce *ConstraintEnforcer) shutdownNoncompliantTasks(node *api.Node) { return nil } - t.DesiredState = api.TaskStateShutdown + // We set the observed state to + // REJECTED, rather than the desired + // state. Desired state is owned by the + // orchestrator, and setting it directly + // will bypass actions such as + // restarting the task on another node + // (if applicable). + t.Status.State = api.TaskStateRejected + t.Status.Message = "assigned node no longer meets constraints" + t.Status.Timestamp = ptypes.MustTimestampProto(time.Now()) return store.UpdateTask(tx, t) }) if err != nil { diff --git a/manager/orchestrator/constraintenforcer/constraint_enforcer_test.go b/manager/orchestrator/constraintenforcer/constraint_enforcer_test.go index 526076fdac..25a9560eec 100644 --- a/manager/orchestrator/constraintenforcer/constraint_enforcer_test.go +++ b/manager/orchestrator/constraintenforcer/constraint_enforcer_test.go @@ -131,9 +131,10 @@ func TestConstraintEnforcer(t *testing.T) { go constraintEnforcer.Run() - // id0 should be killed immediately - shutdown1 := testutils.WatchShutdownTask(t, watch) + // id0 should be rejected immediately + shutdown1 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, "id0", shutdown1.ID) + assert.Equal(t, api.TaskStateRejected, shutdown1.Status.State) // Change node id1 to a manager err = s.Update(func(tx store.Tx) error { @@ -147,8 +148,9 @@ func TestConstraintEnforcer(t *testing.T) { }) assert.NoError(t, err) - shutdown2 := testutils.WatchShutdownTask(t, watch) + shutdown2 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, "id2", shutdown2.ID) + assert.Equal(t, api.TaskStateRejected, shutdown2.Status.State) // Change resources on node id2 err = s.Update(func(tx store.Tx) error { @@ -162,6 +164,7 @@ func TestConstraintEnforcer(t *testing.T) { }) assert.NoError(t, err) - shutdown3 := testutils.WatchShutdownTask(t, watch) + shutdown3 := testutils.WatchTaskUpdate(t, watch) assert.Equal(t, "id4", shutdown3.ID) + assert.Equal(t, api.TaskStateRejected, shutdown3.Status.State) } diff --git a/manager/orchestrator/global/global.go b/manager/orchestrator/global/global.go index 123b765061..2a8a81bfab 100644 --- a/manager/orchestrator/global/global.go +++ b/manager/orchestrator/global/global.go @@ -513,10 +513,22 @@ func (g *Orchestrator) tickTasks(ctx context.Context) { if t == nil || t.DesiredState > api.TaskStateRunning { return nil } + service := store.GetService(tx, t.ServiceID) if service == nil { return nil } + + node, nodeExists := g.nodes[t.NodeID] + serviceEntry, serviceExists := g.globalServices[t.ServiceID] + if !nodeExists || !serviceExists { + return nil + } + if !constraint.NodeMatches(serviceEntry.constraints, node) { + t.DesiredState = api.TaskStateShutdown + return store.UpdateTask(tx, t) + } + return g.restarts.Restart(ctx, tx, g.cluster, service, *t) }) if err != nil {