From cb84b61bfa6caa21ed7211c3f915e57edbbbfa36 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 21 Jul 2016 15:22:02 -0700 Subject: [PATCH 1/2] Node.Register handles the case of transistioning to ready and creating evals --- client/client.go | 4 ++- nomad/node_endpoint.go | 31 ++++++++++++++++--- nomad/node_endpoint_test.go | 61 +++++++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 6 deletions(-) diff --git a/client/client.go b/client/client.go index dd1baa851de..04851e29a25 100644 --- a/client/client.go +++ b/client/client.go @@ -1100,7 +1100,9 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) { // watchNodeUpdates periodically checks for changes to the node attributes or meta map func (c *Client) watchNodeUpdates() { c.logger.Printf("[DEBUG] client: periodically checking for node changes at duration %v", nodeUpdateRetryIntv) - var attrHash, metaHash uint64 + + // Initialize the hashes + _, attrHash, metaHash := c.hasNodeChanged(0, 0) var changed bool for { select { diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 51e041a7550..67d3d5c29a6 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -74,6 +74,16 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp return fmt.Errorf("failed to computed node class: %v", err) } + // Look for the node so we can detect a state transistion + snap, err := n.srv.fsm.State().Snapshot() + if err != nil { + return err + } + originalNode, err := snap.NodeByID(args.Node.ID) + if err != nil { + return err + } + // Commit this update via Raft _, index, err := n.srv.raftApply(structs.NodeRegisterRequestType, args) if err != nil { @@ -83,7 +93,12 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp reply.NodeModifyIndex = index // Check if we should trigger evaluations - if structs.ShouldDrainNode(args.Node.Status) { + originalStatus := structs.NodeStatusInit + if originalNode != nil { + originalStatus = originalNode.Status + } + transitionToReady := transitionedToReady(args.Node.Status, originalStatus) + if structs.ShouldDrainNode(args.Node.Status) || transitionToReady { evalIDs, evalIndex, err := n.createNodeEvals(args.Node.ID, index) if err != nil { n.srv.logger.Printf("[ERR] nomad.client: eval creation failed: %v", err) @@ -105,7 +120,7 @@ func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUp // Set the reply index reply.Index = index - snap, err := n.srv.fsm.State().Snapshot() + snap, err = n.srv.fsm.State().Snapshot() if err != nil { return err } @@ -236,9 +251,7 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct } // Check if we should trigger evaluations - initToReady := node.Status == structs.NodeStatusInit && args.Status == structs.NodeStatusReady - terminalToReady := node.Status == structs.NodeStatusDown && args.Status == structs.NodeStatusReady - transitionToReady := initToReady || terminalToReady + transitionToReady := transitionedToReady(args.Status, node.Status) if structs.ShouldDrainNode(args.Status) || transitionToReady { evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) if err != nil { @@ -271,6 +284,14 @@ func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *struct return nil } +// transitionedToReady is a helper that takes a nodes new and old status and +// returns whether it has transistioned to ready. +func transitionedToReady(newStatus, oldStatus string) bool { + initToReady := oldStatus == structs.NodeStatusInit && newStatus == structs.NodeStatusReady + terminalToReady := oldStatus == structs.NodeStatusDown && newStatus == structs.NodeStatusReady + return initToReady || terminalToReady +} + // UpdateDrain is used to update the drain mode of a client node func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, reply *structs.NodeDrainUpdateResponse) error { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index e796b194c4c..f9471ddd49f 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -153,6 +153,67 @@ func TestClientEndpoint_UpdateStatus(t *testing.T) { } } +func TestClientEndpoint_Register_GetEvals(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Register a system job. + job := mock.SystemJob() + state := s1.fsm.State() + if err := state.UpsertJob(1, job); err != nil { + t.Fatalf("err: %v", err) + } + + // Create the register request going directly to ready + node := mock.Node() + node.Status = structs.NodeStatusReady + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.NodeUpdateResponse + if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Check for heartbeat interval + ttl := resp.HeartbeatTTL + if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { + t.Fatalf("bad: %#v", ttl) + } + + // Check for an eval caused by the system job. + if len(resp.EvalIDs) != 1 { + t.Fatalf("expected one eval; got %#v", resp.EvalIDs) + } + + evalID := resp.EvalIDs[0] + eval, err := state.EvalByID(evalID) + if err != nil { + t.Fatalf("could not get eval %v", evalID) + } + + if eval.Type != "system" { + t.Fatalf("unexpected eval type; got %v; want %q", eval.Type, "system") + } + + // Check for the node in the FSM + out, err := state.NodeByID(node.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected node") + } + if out.ModifyIndex != resp.Index { + t.Fatalf("index mis-match") + } +} + func TestClientEndpoint_UpdateStatus_GetEvals(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() From 82ee68a42da2736e68b299561d5892d005a7e79f Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 25 Jul 2016 12:46:18 -0700 Subject: [PATCH 2/2] add down to up test --- nomad/node_endpoint_test.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index f9471ddd49f..e8ad81a6aa8 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -212,6 +212,37 @@ func TestClientEndpoint_Register_GetEvals(t *testing.T) { if out.ModifyIndex != resp.Index { t.Fatalf("index mis-match") } + + // Transistion it to down and then ready + node.Status = structs.NodeStatusDown + reg = &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + if len(resp.EvalIDs) != 1 { + t.Fatalf("expected one eval; got %#v", resp.EvalIDs) + } + + node.Status = structs.NodeStatusReady + reg = &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + if len(resp.EvalIDs) != 1 { + t.Fatalf("expected one eval; got %#v", resp.EvalIDs) + } } func TestClientEndpoint_UpdateStatus_GetEvals(t *testing.T) {