Skip to content

Commit

Permalink
Node Drain Metadata (#10250)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgbaker authored May 7, 2021
1 parent 826ecd9 commit 140e7b3
Show file tree
Hide file tree
Showing 17 changed files with 632 additions and 62 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ IMPROVEMENTS:
* consul/connect: Added job-submission validation for Connect sidecar service and group names [[GH-10455](https://github.com/hashicorp/nomad/pull/10455)]
* consul/connect: Automatically populate `CONSUL_HTTP_ADDR` for connect native tasks in host networking mode. [[GH-10239](https://github.com/hashicorp/nomad/issues/10239)]
* consul/connect: Added `disable_default_tcp_check` field to `connect.sidecar_service` blocks to disable the default TCP listener check for Connect sidecar tasks. [[GH-10531](https://github.com/hashicorp/nomad/pull/10531)]
* core: Persist metadata about most recent drain in Node.LastDrain [[GH-10250](https://github.com/hashicorp/nomad/issues/10250)]
* csi: Added support for jobs to request a unique volume ID per allocation. [[GH-10136](https://github.com/hashicorp/nomad/issues/10136)]
* driver/docker: Added support for optional extra container labels. [[GH-9885](https://github.com/hashicorp/nomad/issues/9885)]
* driver/docker: Added support for configuring default logger behavior in the client configuration. [[GH-10156](https://github.com/hashicorp/nomad/issues/10156)]
Expand Down
2 changes: 1 addition & 1 deletion api/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestEventStream_PayloadValue(t *testing.T) {
require.Contains(t, raw, "Node")
rawNode := raw["Node"]
require.Equal(t, n.ID, rawNode["ID"])
require.NotContains(t, rawNode, "SecretID")
require.Empty(t, rawNode["SecretID"])
}
case <-time.After(5 * time.Second):
require.Fail(t, "failed waiting for event stream event")
Expand Down
55 changes: 53 additions & 2 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ const (
// status being ready.
NodeSchedulingEligible = "eligible"
NodeSchedulingIneligible = "ineligible"

DrainStatusDraining DrainStatus = "draining"
DrainStatusComplete DrainStatus = "complete"
DrainStatusCanceled DrainStatus = "canceled"
)

// Nodes is used to query node-related API endpoints
Expand Down Expand Up @@ -67,6 +71,9 @@ type NodeUpdateDrainRequest struct {
// MarkEligible marks the node as eligible for scheduling if removing
// the drain strategy.
MarkEligible bool

// Meta allows operators to specify metadata related to the drain operation
Meta map[string]string
}

// NodeDrainUpdateResponse is used to respond to a node drain update
Expand All @@ -77,14 +84,45 @@ type NodeDrainUpdateResponse struct {
WriteMeta
}

// DrainOptions is used to pass through node drain parameters
type DrainOptions struct {
// DrainSpec contains the drain specification for the node. If non-nil,
// the node will be marked ineligible and begin/continue draining according
// to the provided drain spec.
// If nil, any existing drain operation will be canceled.
DrainSpec *DrainSpec

// MarkEligible indicates whether the node should be marked as eligible when
// canceling a drain operation.
MarkEligible bool

// Meta is metadata that is persisted in Node.LastDrain about this
// drain update.
Meta map[string]string
}

// UpdateDrain is used to update the drain strategy for a given node. If
// markEligible is true and the drain is being removed, the node will be marked
// as having its scheduling being eligible
func (n *Nodes) UpdateDrain(nodeID string, spec *DrainSpec, markEligible bool, q *WriteOptions) (*NodeDrainUpdateResponse, error) {
req := &NodeUpdateDrainRequest{
NodeID: nodeID,
resp, err := n.UpdateDrainOpts(nodeID, &DrainOptions{
DrainSpec: spec,
MarkEligible: markEligible,
Meta: nil,
}, q)
return resp, err
}

// UpdateDrainWithMeta is used to update the drain strategy for a given node. If
// markEligible is true and the drain is being removed, the node will be marked
// as having its scheduling being eligible
func (n *Nodes) UpdateDrainOpts(nodeID string, opts *DrainOptions, q *WriteOptions) (*NodeDrainUpdateResponse,
error) {
req := &NodeUpdateDrainRequest{
NodeID: nodeID,
DrainSpec: opts.DrainSpec,
MarkEligible: opts.MarkEligible,
Meta: opts.Meta,
}

var resp NodeDrainUpdateResponse
Expand Down Expand Up @@ -468,6 +506,17 @@ type HostVolumeInfo struct {
ReadOnly bool
}

type DrainStatus string

// DrainMetadata contains information about the most recent drain operation for a given Node.
type DrainMetadata struct {
StartedAt time.Time
UpdatedAt time.Time
Status DrainStatus
AccessorID string
Meta map[string]string
}

// Node is used to deserialize a node entry.
type Node struct {
ID string
Expand All @@ -494,6 +543,7 @@ type Node struct {
HostVolumes map[string]*HostVolumeInfo
CSIControllerPlugins map[string]*CSIInfo
CSINodePlugins map[string]*CSIInfo
LastDrain *DrainMetadata
CreateIndex uint64
ModifyIndex uint64
}
Expand Down Expand Up @@ -789,6 +839,7 @@ type NodeListStub struct {
Drivers map[string]*DriverInfo
NodeResources *NodeResources `json:",omitempty"`
ReservedResources *NodeReservedResources `json:",omitempty"`
LastDrain *DrainMetadata
CreateIndex uint64
ModifyIndex uint64
}
Expand Down
26 changes: 23 additions & 3 deletions api/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,21 @@ func TestNodes_ToggleDrain(t *testing.T) {
out, _, err := nodes.Info(nodeID, nil)
require.Nil(err)
require.False(out.Drain)
require.Nil(out.LastDrain)

// Toggle it on
timeBeforeDrain := time.Now().Add(-1 * time.Second)
spec := &DrainSpec{
Deadline: 10 * time.Second,
}
drainOut, err := nodes.UpdateDrain(nodeID, spec, false, nil)
drainMeta := map[string]string{
"reason": "this node needs to go",
}
drainOut, err := nodes.UpdateDrainOpts(nodeID, &DrainOptions{
DrainSpec: spec,
MarkEligible: false,
Meta: drainMeta,
}, nil)
require.Nil(err)
assertWriteMeta(t, &drainOut.WriteMeta)

Expand All @@ -271,9 +280,20 @@ func TestNodes_ToggleDrain(t *testing.T) {
require.Equal(node.DrainStrategy != nil, node.Drain)
require.True(!node.Drain || node.SchedulingEligibility == NodeSchedulingIneligible) // node.Drain => "ineligible"
if node.Drain && node.SchedulingEligibility == NodeSchedulingIneligible {
require.NotNil(node.LastDrain)
require.Equal(DrainStatusDraining, node.LastDrain.Status)
now := time.Now()
require.False(node.LastDrain.StartedAt.Before(timeBeforeDrain),
"wanted %v <= %v", node.LastDrain.StartedAt, timeBeforeDrain)
require.False(node.LastDrain.StartedAt.After(now),
"wanted %v <= %v", node.LastDrain.StartedAt, now)
require.Equal(drainMeta, node.LastDrain.Meta)
sawDraining = node.ModifyIndex
} else if sawDraining != 0 && node.ModifyIndex > sawDraining &&
!node.Drain && node.SchedulingEligibility == NodeSchedulingIneligible {
} else if sawDraining != 0 && !node.Drain && node.SchedulingEligibility == NodeSchedulingIneligible {
require.NotNil(node.LastDrain)
require.Equal(DrainStatusComplete, node.LastDrain.Status)
require.True(!node.LastDrain.UpdatedAt.Before(node.LastDrain.StartedAt))
require.Equal(drainMeta, node.LastDrain.Meta)
sawDrainComplete = node.ModifyIndex
}
}
Expand Down
1 change: 1 addition & 0 deletions command/agent/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (s *HTTPServer) nodeToggleDrain(resp http.ResponseWriter, req *http.Request
args := structs.NodeUpdateDrainRequest{
NodeID: nodeID,
MarkEligible: drainRequest.MarkEligible,
Meta: drainRequest.Meta,
}
if drainRequest.DrainSpec != nil {
args.DrainStrategy = &structs.DrainStrategy{
Expand Down
60 changes: 43 additions & 17 deletions command/agent/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ func TestHTTP_NodesList(t *testing.T) {
}

// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
if respW.Header().Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
if respW.HeaderMap.Get("X-Nomad-KnownLeader") != "true" {
if respW.Header().Get("X-Nomad-KnownLeader") != "true" {
t.Fatalf("missing known leader")
}
if respW.HeaderMap.Get("X-Nomad-LastContact") == "" {
if respW.Header().Get("X-Nomad-LastContact") == "" {
t.Fatalf("missing last contact")
}

Expand Down Expand Up @@ -100,13 +100,13 @@ func TestHTTP_NodesPrefixList(t *testing.T) {
}

// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
if respW.Header().Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
if respW.HeaderMap.Get("X-Nomad-KnownLeader") != "true" {
if respW.Header().Get("X-Nomad-KnownLeader") != "true" {
t.Fatalf("missing known leader")
}
if respW.HeaderMap.Get("X-Nomad-LastContact") == "" {
if respW.Header().Get("X-Nomad-LastContact") == "" {
t.Fatalf("missing last contact")
}

Expand Down Expand Up @@ -158,7 +158,7 @@ func TestHTTP_NodeForceEval(t *testing.T) {
}

// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
if respW.Header().Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}

Expand Down Expand Up @@ -218,13 +218,13 @@ func TestHTTP_NodeAllocations(t *testing.T) {
}

// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
if respW.Header().Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
if respW.HeaderMap.Get("X-Nomad-KnownLeader") != "true" {
if respW.Header().Get("X-Nomad-KnownLeader") != "true" {
t.Fatalf("missing known leader")
}
if respW.HeaderMap.Get("X-Nomad-LastContact") == "" {
if respW.Header().Get("X-Nomad-LastContact") == "" {
t.Fatalf("missing last contact")
}

Expand Down Expand Up @@ -257,8 +257,13 @@ func TestHTTP_NodeDrain(t *testing.T) {
DrainSpec: &api.DrainSpec{
Deadline: 10 * time.Second,
},
Meta: map[string]string{
"reason": "drain",
},
}

beforeDrain := time.Unix(time.Now().Unix(), 0)

// Make the HTTP request
buf := encodeReq(drainReq)
req, err := http.NewRequest("POST", "/v1/node/"+node.ID+"/drain", buf)
Expand All @@ -270,13 +275,13 @@ func TestHTTP_NodeDrain(t *testing.T) {
require.Nil(err)

// Check for the index
require.NotZero(respW.HeaderMap.Get("X-Nomad-Index"))
require.NotEmpty(respW.Header().Get("X-Nomad-Index"))

// Check the response
dresp, ok := obj.(structs.NodeDrainUpdateResponse)
require.True(ok)

t.Logf("response index=%v node_update_index=0x%x", respW.HeaderMap.Get("X-Nomad-Index"),
t.Logf("response index=%v node_update_index=0x%x", respW.Header().Get("X-Nomad-Index"),
dresp.NodeModifyIndex)

// Check that the node has been updated
Expand All @@ -292,8 +297,16 @@ func TestHTTP_NodeDrain(t *testing.T) {
require.Equal(structs.NodeSchedulingIneligible, out.SchedulingEligibility)
}

require.NotNil(out.LastDrain)
require.Equal(map[string]string{
"reason": "drain",
}, out.LastDrain.Meta)

// Make the HTTP request to unset drain
drainReq.DrainSpec = nil
drainReq.Meta = map[string]string{
"cancel_reason": "changed my mind",
}
buf = encodeReq(drainReq)
req, err = http.NewRequest("POST", "/v1/node/"+node.ID+"/drain", buf)
require.Nil(err)
Expand All @@ -306,6 +319,19 @@ func TestHTTP_NodeDrain(t *testing.T) {
out, err = state.NodeByID(nil, node.ID)
require.Nil(err)
require.Nil(out.DrainStrategy)
require.NotNil(out.LastDrain)
require.False(out.LastDrain.StartedAt.Before(beforeDrain))
require.False(out.LastDrain.UpdatedAt.Before(out.LastDrain.StartedAt))
require.Contains([]structs.DrainStatus{structs.DrainStatusCanceled, structs.DrainStatusComplete}, out.LastDrain.Status)
if out.LastDrain.Status == structs.DrainStatusComplete {
require.Equal(map[string]string{
"reason": "drain",
}, out.LastDrain.Meta)
} else if out.LastDrain.Status == structs.DrainStatusCanceled {
require.Equal(map[string]string{
"cancel_reason": "changed my mind",
}, out.LastDrain.Meta)
}
})
}

Expand Down Expand Up @@ -337,7 +363,7 @@ func TestHTTP_NodeEligible(t *testing.T) {
require.Nil(err)

// Check for the index
require.NotZero(respW.HeaderMap.Get("X-Nomad-Index"))
require.NotZero(respW.Header().Get("X-Nomad-Index"))

// Check the response
_, ok := obj.(structs.NodeEligibilityUpdateResponse)
Expand Down Expand Up @@ -403,7 +429,7 @@ func TestHTTP_NodePurge(t *testing.T) {
}

// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
if respW.Header().Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}

Expand Down Expand Up @@ -456,13 +482,13 @@ func TestHTTP_NodeQuery(t *testing.T) {
}

// Check for the index
if respW.HeaderMap.Get("X-Nomad-Index") == "" {
if respW.Header().Get("X-Nomad-Index") == "" {
t.Fatalf("missing index")
}
if respW.HeaderMap.Get("X-Nomad-KnownLeader") != "true" {
if respW.Header().Get("X-Nomad-KnownLeader") != "true" {
t.Fatalf("missing known leader")
}
if respW.HeaderMap.Get("X-Nomad-LastContact") == "" {
if respW.Header().Get("X-Nomad-LastContact") == "" {
t.Fatalf("missing last contact")
}

Expand Down
Loading

0 comments on commit 140e7b3

Please sign in to comment.