From 913427428a43be42b4f23f7d7f1f27ab3f406665 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 25 Jun 2019 15:26:35 -0500 Subject: [PATCH 1/9] Remove compat code associated with many previous versions of nomad This removes compat code for namespaces (0.7), Drain(0.8) and other older features from releases older than Nomad 0.7 --- command/agent/node_endpoint.go | 28 +---- command/alloc_status.go | 2 +- command/helpers.go | 26 ---- command/job_status.go | 6 +- nomad/fsm.go | 54 --------- nomad/fsm_test.go | 78 ------------ nomad/leader.go | 28 ----- nomad/node_endpoint_test.go | 1 - nomad/state/state_store.go | 212 --------------------------------- scheduler/context.go | 12 +- 10 files changed, 7 insertions(+), 440 deletions(-) diff --git a/command/agent/node_endpoint.go b/command/agent/node_endpoint.go index cd65bc7122e..4c30a270886 100644 --- a/command/agent/node_endpoint.go +++ b/command/agent/node_endpoint.go @@ -2,9 +2,7 @@ package agent import ( "net/http" - "strconv" "strings" - "time" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" @@ -108,30 +106,8 @@ func (s *HTTPServer) nodeToggleDrain(resp http.ResponseWriter, req *http.Request var drainRequest api.NodeUpdateDrainRequest - // COMPAT: Remove in 0.9. Allow the old style enable query param. - // Get the enable parameter - enableRaw := req.URL.Query().Get("enable") - var enable bool - if enableRaw != "" { - var err error - enable, err = strconv.ParseBool(enableRaw) - if err != nil { - return nil, CodedError(400, "invalid enable value") - } - - // Use the force drain to have it keep the same behavior as old clients. - if enable { - drainRequest.DrainSpec = &api.DrainSpec{ - Deadline: -1 * time.Second, - } - } else { - // If drain is disabled on an old client, mark the node as eligible for backwards compatibility - drainRequest.MarkEligible = true - } - } else { - if err := decodeBody(req, &drainRequest); err != nil { - return nil, CodedError(400, err.Error()) - } + if err := decodeBody(req, &drainRequest); err != nil { + return nil, CodedError(400, err.Error()) } args := structs.NodeUpdateDrainRequest{ diff --git a/command/alloc_status.go b/command/alloc_status.go index b656657b8aa..152d359868b 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -235,7 +235,7 @@ func formatAllocBasicInfo(alloc *api.Allocation, client *api.Client, uuidLength fmt.Sprintf("Node ID|%s", limit(alloc.NodeID, uuidLength)), fmt.Sprintf("Node Name|%s", alloc.NodeName), fmt.Sprintf("Job ID|%s", alloc.JobID), - fmt.Sprintf("Job Version|%d", getVersion(alloc.Job)), + fmt.Sprintf("Job Version|%d", alloc.Job.Version), fmt.Sprintf("Client Status|%s", alloc.ClientStatus), fmt.Sprintf("Client Description|%s", alloc.ClientDescription), fmt.Sprintf("Desired Status|%s", alloc.DesiredStatus), diff --git a/command/helpers.go b/command/helpers.go index 3d6eab7ff00..364c7392930 100644 --- a/command/helpers.go +++ b/command/helpers.go @@ -438,32 +438,6 @@ func (j *JobGetter) ApiJob(jpath string) (*api.Job, error) { return jobStruct, nil } -// COMPAT: Remove in 0.7.0 -// Nomad 0.6.0 introduces the submit time field so CLI's interacting with -// older versions of Nomad would SEGFAULT as reported here: -// https://github.com/hashicorp/nomad/issues/2918 -// getSubmitTime returns a submit time of the job converting to time.Time -func getSubmitTime(job *api.Job) time.Time { - if job.SubmitTime != nil { - return time.Unix(0, *job.SubmitTime) - } - - return time.Time{} -} - -// COMPAT: Remove in 0.7.0 -// Nomad 0.6.0 introduces job Versions so CLI's interacting with -// older versions of Nomad would SEGFAULT as reported here: -// https://github.com/hashicorp/nomad/issues/2918 -// getVersion returns a version of the job in safely. -func getVersion(job *api.Job) uint64 { - if job.Version != nil { - return *job.Version - } - - return 0 -} - // mergeAutocompleteFlags is used to join multiple flag completion sets. func mergeAutocompleteFlags(flags ...complete.Flags) complete.Flags { merged := make(map[string]complete.Predictor, len(flags)) diff --git a/command/job_status.go b/command/job_status.go index 63fd163e208..858355d11bb 100644 --- a/command/job_status.go +++ b/command/job_status.go @@ -169,7 +169,7 @@ func (c *JobStatusCommand) Run(args []string) int { basic := []string{ fmt.Sprintf("ID|%s", *job.ID), fmt.Sprintf("Name|%s", *job.Name), - fmt.Sprintf("Submit Date|%s", formatTime(getSubmitTime(job))), + fmt.Sprintf("Submit Date|%s", formatTime(time.Unix(0, *job.SubmitTime))), fmt.Sprintf("Type|%s", *job.Type), fmt.Sprintf("Priority|%d", *job.Priority), fmt.Sprintf("Datacenters|%s", strings.Join(job.Datacenters, ",")), @@ -462,7 +462,7 @@ func formatAllocList(allocations []*api.Allocation, verbose bool, uuidLength int limit(alloc.EvalID, uuidLength), limit(alloc.NodeID, uuidLength), alloc.TaskGroup, - getVersion(alloc.Job), + alloc.Job.Version, alloc.DesiredStatus, alloc.ClientStatus, formatUnixNanoTime(alloc.CreateTime), @@ -478,7 +478,7 @@ func formatAllocList(allocations []*api.Allocation, verbose bool, uuidLength int limit(alloc.ID, uuidLength), limit(alloc.NodeID, uuidLength), alloc.TaskGroup, - getVersion(alloc.Job), + alloc.Job.Version, alloc.DesiredStatus, alloc.ClientStatus, createTimePretty, diff --git a/nomad/fsm.go b/nomad/fsm.go index e2f47783bfc..49392d292f7 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -1135,11 +1135,6 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } - // COMPAT: Handle upgrade to v0.7.0 - if eval.Namespace == "" { - eval.Namespace = structs.DefaultNamespace - } - if err := restore.EvalRestore(eval); err != nil { return err } @@ -1150,11 +1145,6 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } - // COMPAT: Handle upgrade to v0.7.0 - if alloc.Namespace == "" { - alloc.Namespace = structs.DefaultNamespace - } - if err := restore.AllocRestore(alloc); err != nil { return err } @@ -1174,11 +1164,6 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } - // COMPAT: Handle upgrade to v0.7.0 - if launch.Namespace == "" { - launch.Namespace = structs.DefaultNamespace - } - if err := restore.PeriodicLaunchRestore(launch); err != nil { return err } @@ -1189,11 +1174,6 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } - // COMPAT: Handle upgrade to v0.7.0 - if summary.Namespace == "" { - summary.Namespace = structs.DefaultNamespace - } - if err := restore.JobSummaryRestore(summary); err != nil { return err } @@ -1213,11 +1193,6 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } - // COMPAT: Handle upgrade to v0.7.0 - if version.Namespace == "" { - version.Namespace = structs.DefaultNamespace - } - if err := restore.JobVersionRestore(version); err != nil { return err } @@ -1228,11 +1203,6 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } - // COMPAT: Handle upgrade to v0.7.0 - if deployment.Namespace == "" { - deployment.Namespace = structs.DefaultNamespace - } - if err := restore.DeploymentRestore(deployment); err != nil { return err } @@ -1280,30 +1250,6 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { restore.Commit() - // Create Job Summaries - // COMPAT 0.4 -> 0.4.1 - // We can remove this in 0.5. This exists so that the server creates job - // summaries if they were not present previously. When users upgrade to 0.5 - // from 0.4.1, the snapshot will contain job summaries so it will be safe to - // remove this block. - index, err := newState.Index("job_summary") - if err != nil { - return fmt.Errorf("couldn't fetch index of job summary table: %v", err) - } - - // If the index is 0 that means there is no job summary in the snapshot so - // we will have to create them - if index == 0 { - // query the latest index - latestIndex, err := newState.LatestIndex() - if err != nil { - return fmt.Errorf("unable to query latest index: %v", index) - } - if err := newState.ReconcileJobSummaries(latestIndex); err != nil { - return fmt.Errorf("error reconciling summaries: %v", err) - } - } - // COMPAT Remove in 0.10 // Clean up active deployments that do not have a job if err := n.failLeakedDeployments(newState); err != nil { diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 5d76a76343c..c7873e9124e 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -350,7 +350,6 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) { ws := memdb.NewWatchSet() node, err = fsm.State().NodeByID(ws, req.Node.ID) require.Nil(err) - require.True(node.Drain) require.Equal(node.DrainStrategy, strategy) require.Len(node.Events, 2) } @@ -394,46 +393,10 @@ func TestFSM_UpdateNodeDrain(t *testing.T) { ws := memdb.NewWatchSet() node, err = fsm.State().NodeByID(ws, req.Node.ID) require.Nil(err) - require.True(node.Drain) require.Equal(node.DrainStrategy, strategy) require.Len(node.Events, 2) } -func TestFSM_UpdateNodeDrain_Pre08_Compatibility(t *testing.T) { - t.Parallel() - require := require.New(t) - fsm := testFSM(t) - - // Force a node into the state store without eligiblity - node := mock.Node() - node.SchedulingEligibility = "" - require.Nil(fsm.State().UpsertNode(1, node)) - - // Do an old style drain - req := structs.NodeUpdateDrainRequest{ - NodeID: node.ID, - Drain: true, - } - buf, err := structs.Encode(structs.NodeUpdateDrainRequestType, req) - require.Nil(err) - - resp := fsm.Apply(makeLog(buf)) - require.Nil(resp) - - // Verify we have upgraded to a force drain - ws := memdb.NewWatchSet() - node, err = fsm.State().NodeByID(ws, req.NodeID) - require.Nil(err) - require.True(node.Drain) - - expected := &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: -1 * time.Second, - }, - } - require.Equal(expected, node.DrainStrategy) -} - func TestFSM_UpdateNodeEligibility(t *testing.T) { t.Parallel() require := require.New(t) @@ -2701,47 +2664,6 @@ func TestFSM_SnapshotRestore_SchedulerConfiguration(t *testing.T) { } -func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { - t.Parallel() - // Add some state - fsm := testFSM(t) - state := fsm.State() - - // make an allocation - alloc := mock.Alloc() - state.UpsertJob(1010, alloc.Job) - state.UpsertAllocs(1011, []*structs.Allocation{alloc}) - - // Delete the summary - state.DeleteJobSummary(1040, alloc.Namespace, alloc.Job.ID) - - // Delete the index - if err := state.RemoveIndex("job_summary"); err != nil { - t.Fatalf("err: %v", err) - } - - fsm2 := testSnapshotRestore(t, fsm) - state2 := fsm2.State() - latestIndex, _ := state.LatestIndex() - - ws := memdb.NewWatchSet() - out, _ := state2.JobSummaryByID(ws, alloc.Namespace, alloc.Job.ID) - expected := structs.JobSummary{ - JobID: alloc.Job.ID, - Namespace: alloc.Job.Namespace, - Summary: map[string]structs.TaskGroupSummary{ - "web": { - Starting: 1, - }, - }, - CreateIndex: 1010, - ModifyIndex: latestIndex, - } - if !reflect.DeepEqual(&expected, out) { - t.Fatalf("expected: %#v, actual: %#v", &expected, out) - } -} - func TestFSM_ReconcileSummaries(t *testing.T) { t.Parallel() // Add some state diff --git a/nomad/leader.go b/nomad/leader.go index 0b08fc60cc9..60ef4e7ee40 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -268,15 +268,6 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { return err } - // COMPAT 0.4 - 0.4.1 - // Reconcile the summaries of the registered jobs. We reconcile summaries - // only if the server is 0.4.1 since summaries are not present in 0.4 they - // might be incorrect after upgrading to 0.4.1 the summaries might not be - // correct - if err := s.reconcileJobSummaries(); err != nil { - return fmt.Errorf("unable to reconcile job summaries: %v", err) - } - // Start replication of ACLs and Policies if they are enabled, // and we are not the authoritative region. if s.config.ACLEnabled && s.config.Region != s.config.AuthoritativeRegion { @@ -798,25 +789,6 @@ func (s *Server) reconcileMember(member serf.Member) error { return nil } -// reconcileJobSummaries reconciles the summaries of all the jobs registered in -// the system -// COMPAT 0.4 -> 0.4.1 -func (s *Server) reconcileJobSummaries() error { - index, err := s.fsm.state.LatestIndex() - if err != nil { - return fmt.Errorf("unable to read latest index: %v", err) - } - s.logger.Debug("leader reconciling job summaries", "index", index) - - args := &structs.GenericResponse{} - msg := structs.ReconcileJobSummariesRequestType | structs.IgnoreUnknownTypeFlag - if _, _, err = s.raftApply(msg, args); err != nil { - return fmt.Errorf("reconciliation of job summaries failed: %v", err) - } - - return nil -} - // addRaftPeer is used to add a new Raft peer when a Nomad server joins func (s *Server) addRaftPeer(m serf.Member, parts *serverParts) error { // Check for possibility of multiple bootstrap nodes diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 6b7b90e2f40..ecb21fc501b 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -894,7 +894,6 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) { ws := memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.True(out.Drain) require.Equal(strategy.Deadline, out.DrainStrategy.Deadline) require.Len(out.Events, 2) require.Equal(NodeDrainEventDrainSet, out.Events[1].Message) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 09c74286400..dcd33a953a7 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -261,16 +261,6 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR s.upsertDeploymentUpdates(index, results.DeploymentUpdates, txn) } - // COMPAT: Nomad versions before 0.7.1 did not include the eval ID when - // applying the plan. Thus while we are upgrading, we ignore updating the - // modify index of evaluations from older plans. - if results.EvalID != "" { - // Update the modify index of the eval id - if err := s.updateEvalModifyIndex(txn, index, results.EvalID); err != nil { - return err - } - } - numAllocs := 0 if len(results.Alloc) > 0 || len(results.NodePreemptions) > 0 { // COMPAT 0.11: This branch will be removed, when Alloc is removed @@ -353,11 +343,6 @@ func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSumma txn := s.db.Txn(true) defer txn.Abort() - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if jobSummary.Namespace == "" { - jobSummary.Namespace = structs.DefaultNamespace - } - // Check if the job summary already exists existing, err := txn.First("job_summary", "id", jobSummary.Namespace, jobSummary.JobID) if err != nil { @@ -393,11 +378,6 @@ func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error txn := s.db.Txn(true) defer txn.Abort() - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - // Delete the job summary if _, err := txn.DeleteAll("job_summary", "id", namespace, id); err != nil { return fmt.Errorf("deleting job summary failed: %v", err) @@ -428,11 +408,6 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl return fmt.Errorf("deployment lookup failed: %v", err) } - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if deployment.Namespace == "" { - deployment.Namespace = structs.DefaultNamespace - } - // Setup the indexes correctly if existing != nil { deployment.CreateIndex = existing.(*structs.Deployment).CreateIndex @@ -539,11 +514,6 @@ func (s *StateStore) deploymentByIDImpl(ws memdb.WatchSet, deploymentID string, func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Deployment, error) { txn := s.db.Txn(false) - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - var job *structs.Job // Read job from state store _, existing, err := txn.FirstWatch("jobs", "id", namespace, jobID) @@ -587,11 +557,6 @@ func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID stri func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID string) (*structs.Deployment, error) { txn := s.db.Txn(false) - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - // Get an iterator over the deployments iter, err := txn.Get("deployment", "job", namespace, jobID) if err != nil { @@ -820,7 +785,6 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st } // Update the drain in the copy - copyNode.Drain = drain != nil // COMPAT: Remove in Nomad 0.9 copyNode.DrainStrategy = drain if drain != nil { copyNode.SchedulingEligibility = structs.NodeSchedulingIneligible @@ -1025,11 +989,6 @@ func (s *StateStore) UpsertJobTxn(index uint64, job *structs.Job, txn Txn) error // upsertJobImpl is the implementation for registering a job or updating a job definition func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *memdb.Txn) error { - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if job.Namespace == "" { - job.Namespace = structs.DefaultNamespace - } - // Assert the namespace exists if exists, err := s.namespaceExists(txn, job.Namespace); err != nil { return err @@ -1116,11 +1075,6 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error { // DeleteJobTxn is used to deregister a job, like DeleteJob, // but in a transaction. Useful for when making multiple modifications atomically func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn) error { - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - // Lookup the node existing, err := txn.First("jobs", "id", namespace, jobID) if err != nil { @@ -1164,11 +1118,6 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn // Update the modify index pSummary.ModifyIndex = index - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if pSummary.Namespace == "" { - pSummary.Namespace = structs.DefaultNamespace - } - // Insert the summary if err := txn.Insert("job_summary", pSummary); err != nil { return fmt.Errorf("job summary insert failed: %v", err) @@ -1207,11 +1156,6 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn // deleteJobVersions deletes all versions of the given job. func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memdb.Txn) error { - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if job.Namespace == "" { - job.Namespace = structs.DefaultNamespace - } - iter, err := txn.Get("job_version", "id_prefix", job.Namespace, job.ID) if err != nil { return err @@ -1252,11 +1196,6 @@ func (s *StateStore) deleteJobVersions(index uint64, job *structs.Job, txn *memd // upsertJobVersion inserts a job into its historic version table and limits the // number of job versions that are tracked. func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb.Txn) error { - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if job.Namespace == "" { - job.Namespace = structs.DefaultNamespace - } - // Insert the job if err := txn.Insert("job_version", job); err != nil { return fmt.Errorf("failed to insert job into job_version table: %v", err) @@ -1313,11 +1252,6 @@ func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs. // JobByIDTxn is used to lookup a job by its ID, like JobByID. JobByID returns the job version // accessible through in the transaction func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn) (*structs.Job, error) { - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - watchCh, existing, err := txn.FirstWatch("jobs", "id", namespace, id) if err != nil { return nil, fmt.Errorf("job lookup failed: %v", err) @@ -1334,11 +1268,6 @@ func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { txn := s.db.Txn(false) - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - iter, err := txn.Get("jobs", "id_prefix", namespace, id) if err != nil { return nil, fmt.Errorf("job lookup failed: %v", err) @@ -1353,11 +1282,6 @@ func (s *StateStore) JobsByIDPrefix(ws memdb.WatchSet, namespace, id string) (me func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { txn := s.db.Txn(false) - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - return s.jobVersionByID(txn, &ws, namespace, id) } @@ -1365,11 +1289,6 @@ func (s *StateStore) JobVersionsByID(ws memdb.WatchSet, namespace, id string) ([ // versions of a job and is called under an existing transaction. A watch set // can optionally be passed in to add the job histories to the watch set. func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, namespace, id string) ([]*structs.Job, error) { - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - // Get all the historic jobs for this ID iter, err := txn.Get("job_version", "id_prefix", namespace, id) if err != nil { @@ -1407,10 +1326,6 @@ func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, namespac // JobByIDAndVersion returns the job identified by its ID and Version. The // passed watchset may be nil. func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, namespace, id string, version uint64) (*structs.Job, error) { - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } txn := s.db.Txn(false) return s.jobByIDAndVersionImpl(ws, namespace, id, version, txn) } @@ -1419,10 +1334,6 @@ func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, namespace, id string, // passed watchset may be nil. func (s *StateStore) jobByIDAndVersionImpl(ws memdb.WatchSet, namespace, id string, version uint64, txn *memdb.Txn) (*structs.Job, error) { - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } watchCh, existing, err := txn.FirstWatch("job_version", "id", namespace, id, version) if err != nil { @@ -1537,11 +1448,6 @@ func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, func (s *StateStore) JobSummaryByID(ws memdb.WatchSet, namespace, jobID string) (*structs.JobSummary, error) { txn := s.db.Txn(false) - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - watchCh, existing, err := txn.FirstWatch("job_summary", "id", namespace, jobID) if err != nil { return nil, err @@ -1576,11 +1482,6 @@ func (s *StateStore) JobSummaries(ws memdb.WatchSet) (memdb.ResultIterator, erro func (s *StateStore) JobSummaryByPrefix(ws memdb.WatchSet, namespace, id string) (memdb.ResultIterator, error) { txn := s.db.Txn(false) - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - iter, err := txn.Get("job_summary", "id_prefix", namespace, id) if err != nil { return nil, fmt.Errorf("eval lookup failed: %v", err) @@ -1596,11 +1497,6 @@ func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.Periodic txn := s.db.Txn(true) defer txn.Abort() - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if launch.Namespace == "" { - launch.Namespace = structs.DefaultNamespace - } - // Check if the job already exists existing, err := txn.First("periodic_launch", "id", launch.Namespace, launch.ID) if err != nil { @@ -1643,11 +1539,6 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string) // DeletePeriodicLaunchTxn is used to delete the periodic launch, like DeletePeriodicLaunch // but in a transaction. Useful for when making multiple modifications atomically func (s *StateStore) DeletePeriodicLaunchTxn(index uint64, namespace, jobID string, txn Txn) error { - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - // Lookup the launch existing, err := txn.First("periodic_launch", "id", namespace, jobID) if err != nil { @@ -1673,11 +1564,6 @@ func (s *StateStore) DeletePeriodicLaunchTxn(index uint64, namespace, jobID stri func (s *StateStore) PeriodicLaunchByID(ws memdb.WatchSet, namespace, id string) (*structs.PeriodicLaunch, error) { txn := s.db.Txn(false) - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - watchCh, existing, err := txn.FirstWatch("periodic_launch", "id", namespace, id) if err != nil { return nil, fmt.Errorf("periodic launch lookup failed: %v", err) @@ -1751,11 +1637,6 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct return fmt.Errorf("eval lookup failed: %v", err) } - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if eval.Namespace == "" { - eval.Namespace = structs.DefaultNamespace - } - // Update the indexes if existing != nil { eval.CreateIndex = existing.(*structs.Evaluation).CreateIndex @@ -1787,11 +1668,6 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct // Insert the job summary if hasSummaryChanged { - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if js.Namespace == "" { - js.Namespace = structs.DefaultNamespace - } - js.ModifyIndex = index if err := txn.Insert("job_summary", js); err != nil { return fmt.Errorf("job summary insert failed: %v", err) @@ -1826,11 +1702,6 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct newEval.StatusDescription = fmt.Sprintf("evaluation %q successful", newEval.ID) newEval.ModifyIndex = index - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if newEval.Namespace == "" { - newEval.Namespace = structs.DefaultNamespace - } - if err := txn.Insert("evals", newEval); err != nil { return fmt.Errorf("eval insert failed: %v", err) } @@ -1960,11 +1831,6 @@ func (s *StateStore) EvalsByIDPrefix(ws memdb.WatchSet, namespace, id string) (m ws.Add(iter.WatchCh()) - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - // Wrap the iterator in a filter wrap := memdb.NewFilterIterator(iter, evalNamespaceFilter(namespace)) return wrap, nil @@ -1987,11 +1853,6 @@ func evalNamespaceFilter(namespace string) func(interface{}) bool { func (s *StateStore) EvalsByJob(ws memdb.WatchSet, namespace, jobID string) ([]*structs.Evaluation, error) { txn := s.db.Txn(false) - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - // Get an iterator over the node allocations iter, err := txn.Get("evals", "job_prefix", namespace, jobID) if err != nil { @@ -2092,11 +1953,6 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a // Copy everything from the existing allocation copyAlloc := exist.Copy() - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if copyAlloc.Namespace == "" { - copyAlloc.Namespace = structs.DefaultNamespace - } - // Pull in anything the client is the authority on copyAlloc.ClientStatus = alloc.ClientStatus copyAlloc.ClientDescription = alloc.ClientDescription @@ -2230,11 +2086,6 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation } } - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if alloc.Namespace == "" { - alloc.Namespace = structs.DefaultNamespace - } - // OPTIMIZATION: // These should be given a map of new to old allocation and the updates // should be one on all changes. The current implementation causes O(n) @@ -2459,11 +2310,6 @@ func (s *StateStore) AllocsByNodeTerminal(ws memdb.WatchSet, node string, termin func (s *StateStore) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Allocation, error) { txn := s.db.Txn(false) - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - // Get the job var job *structs.Job rawJob, err := txn.First("jobs", "id", namespace, jobID) @@ -2752,11 +2598,6 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym copy.StatusDescription = u.StatusDescription copy.ModifyIndex = index - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if copy.Namespace == "" { - copy.Namespace = structs.DefaultNamespace - } - // Insert the deployment if err := txn.Insert("deployment", copy); err != nil { return err @@ -2783,11 +2624,6 @@ func (s *StateStore) UpdateJobStability(index uint64, namespace, jobID string, j txn := s.db.Txn(true) defer txn.Abort() - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - if err := s.updateJobStabilityImpl(index, namespace, jobID, jobVersion, stable, txn); err != nil { return err } @@ -2798,11 +2634,6 @@ func (s *StateStore) UpdateJobStability(index uint64, namespace, jobID string, j // updateJobStabilityImpl updates the stability of the given job and version func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID string, jobVersion uint64, stable bool, txn *memdb.Txn) error { - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if namespace == "" { - namespace = structs.DefaultNamespace - } - // Get the job that is referenced job, err := s.jobByIDAndVersionImpl(nil, namespace, jobID, jobVersion, txn) if err != nil { @@ -3231,11 +3062,6 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { summary.Summary[tg.Name] = structs.TaskGroupSummary{} } - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if job.Namespace == "" { - job.Namespace = structs.DefaultNamespace - } - // Find all the allocations for the jobs iterAllocs, err := txn.Get("allocs", "job", job.Namespace, job.ID) if err != nil { @@ -3299,10 +3125,6 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { func (s *StateStore) setJobStatuses(index uint64, txn *memdb.Txn, jobs map[structs.NamespacedID]string, evalDelete bool) error { for tuple, forceStatus := range jobs { - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if tuple.Namespace == "" { - tuple.Namespace = structs.DefaultNamespace - } existing, err := txn.First("jobs", "id", tuple.Namespace, tuple.ID) if err != nil { @@ -3355,11 +3177,6 @@ func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn, updated.Status = newStatus updated.ModifyIndex = index - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if updated.Namespace == "" { - updated.Namespace = structs.DefaultNamespace - } - // Insert the job if err := txn.Insert("jobs", updated); err != nil { return fmt.Errorf("job insert failed: %v", err) @@ -3385,11 +3202,6 @@ func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn, pSummary.Children = new(structs.JobChildrenSummary) } - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if pSummary.Namespace == "" { - pSummary.Namespace = structs.DefaultNamespace - } - // Determine the transition and update the correct fields children := pSummary.Children @@ -3436,11 +3248,6 @@ func (s *StateStore) setJobStatus(index uint64, txn *memdb.Txn, } func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete bool) (string, error) { - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if job.Namespace == "" { - job.Namespace = structs.DefaultNamespace - } - // System, Periodic and Parameterized jobs are running until explicitly // stopped if job.Type == structs.JobTypeSystem || job.IsParameterized() || job.IsPeriodic() { @@ -3499,11 +3306,6 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, txn *memdb.Txn) error { - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if job.Namespace == "" { - job.Namespace = structs.DefaultNamespace - } - // Update the job summary summaryRaw, err := txn.First("job_summary", "id", job.Namespace, job.ID) if err != nil { @@ -3543,11 +3345,6 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, if hasSummaryChanged { summary.ModifyIndex = index - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if summary.Namespace == "" { - summary.Namespace = structs.DefaultNamespace - } - // Update the indexes table for job summary if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { return fmt.Errorf("index update failed: %v", err) @@ -3662,10 +3459,6 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat if alloc.Job == nil { return nil } - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if alloc.Namespace == "" { - alloc.Namespace = structs.DefaultNamespace - } summaryRaw, err := txn.First("job_summary", "id", alloc.Namespace, alloc.JobID) if err != nil { @@ -3761,11 +3554,6 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat if summaryChanged { jobSummary.ModifyIndex = index - // COMPAT 0.7: Upgrade old objects that do not have namespaces - if jobSummary.Namespace == "" { - jobSummary.Namespace = structs.DefaultNamespace - } - // Update the indexes table for job summary if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { return fmt.Errorf("index update failed: %v", err) diff --git a/scheduler/context.go b/scheduler/context.go index 07cf9fdd6f3..e62509a82dd 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -282,10 +282,7 @@ func (e *EvalEligibility) GetClasses() map[string]bool { // JobStatus returns the eligibility status of the job. func (e *EvalEligibility) JobStatus(class string) ComputedClassFeasibility { - // COMPAT: Computed node class was introduced in 0.3. Clients running < 0.3 - // will not have a computed class. The safest value to return is the escaped - // case, since it disables any optimization. - if e.jobEscaped || class == "" { + if e.jobEscaped { return EvalComputedClassEscaped } @@ -307,13 +304,6 @@ func (e *EvalEligibility) SetJobEligibility(eligible bool, class string) { // TaskGroupStatus returns the eligibility status of the task group. func (e *EvalEligibility) TaskGroupStatus(tg, class string) ComputedClassFeasibility { - // COMPAT: Computed node class was introduced in 0.3. Clients running < 0.3 - // will not have a computed class. The safest value to return is the escaped - // case, since it disables any optimization. - if class == "" { - return EvalComputedClassEscaped - } - if escaped, ok := e.tgEscapedConstraints[tg]; ok { if escaped { return EvalComputedClassEscaped From 4df53b4026d81864fcf4aede1ebe9fd1380379a0 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 25 Jun 2019 19:41:09 -0500 Subject: [PATCH 2/9] newline --- nomad/node_endpoint_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index ecb21fc501b..1c748da8312 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -891,6 +891,7 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) { // Check for the node in the FSM state := s1.fsm.State() + ws := memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) From ea77c3af3945661df9948a52054097976e11baee Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 26 Jun 2019 13:59:14 -0500 Subject: [PATCH 3/9] Restore accidentally deleted block --- nomad/state/state_store.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index dcd33a953a7..6f4080d0ae4 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -261,6 +261,13 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR s.upsertDeploymentUpdates(index, results.DeploymentUpdates, txn) } + if results.EvalID != "" { + // Update the modify index of the eval id + if err := s.updateEvalModifyIndex(txn, index, results.EvalID); err != nil { + return err + } + } + numAllocs := 0 if len(results.Alloc) > 0 || len(results.NodePreemptions) > 0 { // COMPAT 0.11: This branch will be removed, when Alloc is removed From a5c3b6e8cbaf75de5e0fbacfae70be81ed0afc89 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 26 Jun 2019 16:12:07 -0500 Subject: [PATCH 4/9] Fix node drain test --- nomad/node_endpoint_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 1c748da8312..d7d5844341e 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2682,7 +2682,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) { if resp2.Index != 3 { t.Fatalf("Bad index: %d %d", resp2.Index, 3) } - if len(resp2.Nodes) != 1 || !resp2.Nodes[0].Drain { + if len(resp2.Nodes) != 1 { t.Fatalf("bad: %#v", resp2.Nodes) } From bead05f05f1e9ca1665b64fd6a16c83fd3693a52 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 26 Jun 2019 16:30:53 -0500 Subject: [PATCH 5/9] Fix more tests --- nomad/state/state_store_test.go | 2 -- scheduler/context_test.go | 10 ---------- 2 files changed, 12 deletions(-) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index d21e4bbdefb..7e3539f3ec1 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -935,7 +935,6 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) { for _, id := range []string{n1.ID, n2.ID} { out, err := state.NodeByID(ws, id) require.Nil(err) - require.True(out.Drain) require.NotNil(out.DrainStrategy) require.Equal(out.DrainStrategy, expectedDrain) require.Len(out.Events, 2) @@ -978,7 +977,6 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) { ws = memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.True(out.Drain) require.NotNil(out.DrainStrategy) require.Equal(out.DrainStrategy, expectedDrain) require.Len(out.Events, 2) diff --git a/scheduler/context_test.go b/scheduler/context_test.go index e7c6927c725..e5e0be7a84c 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -168,11 +168,6 @@ func TestEvalEligibility_JobStatus(t *testing.T) { if status := e.JobStatus(cc); status != EvalComputedClassEligible { t.Fatalf("JobStatus() returned %v; want %v", status, EvalComputedClassEligible) } - - // Check that if I pass an empty class it returns escaped - if status := e.JobStatus(""); status != EvalComputedClassEscaped { - t.Fatalf("JobStatus() returned %v; want %v", status, EvalComputedClassEscaped) - } } func TestEvalEligibility_TaskGroupStatus(t *testing.T) { @@ -195,11 +190,6 @@ func TestEvalEligibility_TaskGroupStatus(t *testing.T) { if status := e.TaskGroupStatus(tg, cc); status != EvalComputedClassEligible { t.Fatalf("TaskGroupStatus() returned %v; want %v", status, EvalComputedClassEligible) } - - // Check that if I pass an empty class it returns escaped - if status := e.TaskGroupStatus(tg, ""); status != EvalComputedClassEscaped { - t.Fatalf("TaskGroupStatus() returned %v; want %v", status, EvalComputedClassEscaped) - } } func TestEvalEligibility_SetJob(t *testing.T) { From 27edf8f5fceab5ffa9667f39b8fa84334595485d Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 26 Jun 2019 16:59:23 -0500 Subject: [PATCH 6/9] remove now unneeded test --- command/agent/node_endpoint_test.go | 55 ----------------------------- 1 file changed, 55 deletions(-) diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index 978c266f406..6c94af9effb 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -302,61 +302,6 @@ func TestHTTP_NodeDrain(t *testing.T) { }) } -// Tests backwards compatibility code to support pre 0.8 clients -func TestHTTP_NodeDrain_Compat(t *testing.T) { - t.Parallel() - require := require.New(t) - httpTest(t, nil, func(s *TestAgent) { - // Create the node - node := mock.Node() - args := structs.NodeRegisterRequest{ - Node: node, - WriteRequest: structs.WriteRequest{Region: "global"}, - } - var resp structs.NodeUpdateResponse - require.Nil(s.Agent.RPC("Node.Register", &args, &resp)) - - // Make the HTTP request - req, err := http.NewRequest("POST", "/v1/node/"+node.ID+"/drain?enable=true", nil) - require.Nil(err) - respW := httptest.NewRecorder() - - // Make the request - obj, err := s.Server.NodeSpecificRequest(respW, req) - require.Nil(err) - - // Check for the index - require.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) - - // Check the response - _, ok := obj.(structs.NodeDrainUpdateResponse) - require.True(ok) - - // Check that the node has been updated - state := s.Agent.server.State() - out, err := state.NodeByID(nil, node.ID) - require.Nil(err) - require.True(out.Drain) - require.NotNil(out.DrainStrategy) - require.Equal(-1*time.Second, out.DrainStrategy.Deadline) - - // Make the HTTP request to unset drain - req, err = http.NewRequest("POST", "/v1/node/"+node.ID+"/drain?enable=false", nil) - require.Nil(err) - respW = httptest.NewRecorder() - - // Make the request - _, err = s.Server.NodeSpecificRequest(respW, req) - require.Nil(err) - - out, err = state.NodeByID(nil, node.ID) - require.Nil(err) - require.False(out.Drain) - require.Nil(out.DrainStrategy) - require.Equal(structs.NodeSchedulingEligible, out.SchedulingEligibility) - }) -} - func TestHTTP_NodeEligible(t *testing.T) { t.Parallel() require := require.New(t) From 154e09e8ca52899f95f280abc9a51d3ac57e7eb2 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 26 Jun 2019 17:33:51 -0500 Subject: [PATCH 7/9] one more drain test --- command/agent/node_endpoint_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index 6c94af9effb..8bc4dc95e3c 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -280,7 +280,6 @@ func TestHTTP_NodeDrain(t *testing.T) { state := s.Agent.server.State() out, err := state.NodeByID(nil, node.ID) require.Nil(err) - require.True(out.Drain) require.NotNil(out.DrainStrategy) require.Equal(10*time.Second, out.DrainStrategy.Deadline) @@ -297,7 +296,6 @@ func TestHTTP_NodeDrain(t *testing.T) { out, err = state.NodeByID(nil, node.ID) require.Nil(err) - require.False(out.Drain) require.Nil(out.DrainStrategy) }) } From 4fa6688f73143c0fcbc24890c59cdda981a7166b Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 1 Jul 2019 15:12:01 -0500 Subject: [PATCH 8/9] Undo removal of node drain compat changes Decided to remove that in 0.10 --- command/agent/node_endpoint.go | 28 +++++++++++++- command/agent/node_endpoint_test.go | 57 +++++++++++++++++++++++++++++ nomad/fsm_test.go | 37 +++++++++++++++++++ nomad/node_endpoint_test.go | 4 +- nomad/state/state_store_test.go | 2 + 5 files changed, 124 insertions(+), 4 deletions(-) diff --git a/command/agent/node_endpoint.go b/command/agent/node_endpoint.go index 4c30a270886..6b6f99750db 100644 --- a/command/agent/node_endpoint.go +++ b/command/agent/node_endpoint.go @@ -2,7 +2,9 @@ package agent import ( "net/http" + "strconv" "strings" + "time" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" @@ -106,8 +108,30 @@ func (s *HTTPServer) nodeToggleDrain(resp http.ResponseWriter, req *http.Request var drainRequest api.NodeUpdateDrainRequest - if err := decodeBody(req, &drainRequest); err != nil { - return nil, CodedError(400, err.Error()) + // COMPAT: Remove in 0.10. Allow the old style enable query param. + // Get the enable parameter + enableRaw := req.URL.Query().Get("enable") + var enable bool + if enableRaw != "" { + var err error + enable, err = strconv.ParseBool(enableRaw) + if err != nil { + return nil, CodedError(400, "invalid enable value") + } + + // Use the force drain to have it keep the same behavior as old clients. + if enable { + drainRequest.DrainSpec = &api.DrainSpec{ + Deadline: -1 * time.Second, + } + } else { + // If drain is disabled on an old client, mark the node as eligible for backwards compatibility + drainRequest.MarkEligible = true + } + } else { + if err := decodeBody(req, &drainRequest); err != nil { + return nil, CodedError(400, err.Error()) + } } args := structs.NodeUpdateDrainRequest{ diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index 8bc4dc95e3c..978c266f406 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -280,6 +280,7 @@ func TestHTTP_NodeDrain(t *testing.T) { state := s.Agent.server.State() out, err := state.NodeByID(nil, node.ID) require.Nil(err) + require.True(out.Drain) require.NotNil(out.DrainStrategy) require.Equal(10*time.Second, out.DrainStrategy.Deadline) @@ -296,10 +297,66 @@ func TestHTTP_NodeDrain(t *testing.T) { out, err = state.NodeByID(nil, node.ID) require.Nil(err) + require.False(out.Drain) require.Nil(out.DrainStrategy) }) } +// Tests backwards compatibility code to support pre 0.8 clients +func TestHTTP_NodeDrain_Compat(t *testing.T) { + t.Parallel() + require := require.New(t) + httpTest(t, nil, func(s *TestAgent) { + // Create the node + node := mock.Node() + args := structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.NodeUpdateResponse + require.Nil(s.Agent.RPC("Node.Register", &args, &resp)) + + // Make the HTTP request + req, err := http.NewRequest("POST", "/v1/node/"+node.ID+"/drain?enable=true", nil) + require.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NodeSpecificRequest(respW, req) + require.Nil(err) + + // Check for the index + require.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + + // Check the response + _, ok := obj.(structs.NodeDrainUpdateResponse) + require.True(ok) + + // Check that the node has been updated + state := s.Agent.server.State() + out, err := state.NodeByID(nil, node.ID) + require.Nil(err) + require.True(out.Drain) + require.NotNil(out.DrainStrategy) + require.Equal(-1*time.Second, out.DrainStrategy.Deadline) + + // Make the HTTP request to unset drain + req, err = http.NewRequest("POST", "/v1/node/"+node.ID+"/drain?enable=false", nil) + require.Nil(err) + respW = httptest.NewRecorder() + + // Make the request + _, err = s.Server.NodeSpecificRequest(respW, req) + require.Nil(err) + + out, err = state.NodeByID(nil, node.ID) + require.Nil(err) + require.False(out.Drain) + require.Nil(out.DrainStrategy) + require.Equal(structs.NodeSchedulingEligible, out.SchedulingEligibility) + }) +} + func TestHTTP_NodeEligible(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index c7873e9124e..ef79d146198 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -350,6 +350,7 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) { ws := memdb.NewWatchSet() node, err = fsm.State().NodeByID(ws, req.Node.ID) require.Nil(err) + require.True(node.Drain) require.Equal(node.DrainStrategy, strategy) require.Len(node.Events, 2) } @@ -393,10 +394,46 @@ func TestFSM_UpdateNodeDrain(t *testing.T) { ws := memdb.NewWatchSet() node, err = fsm.State().NodeByID(ws, req.Node.ID) require.Nil(err) + require.True(node.Drain) require.Equal(node.DrainStrategy, strategy) require.Len(node.Events, 2) } +func TestFSM_UpdateNodeDrain_Pre08_Compatibility(t *testing.T) { + t.Parallel() + require := require.New(t) + fsm := testFSM(t) + + // Force a node into the state store without eligiblity + node := mock.Node() + node.SchedulingEligibility = "" + require.Nil(fsm.State().UpsertNode(1, node)) + + // Do an old style drain + req := structs.NodeUpdateDrainRequest{ + NodeID: node.ID, + Drain: true, + } + buf, err := structs.Encode(structs.NodeUpdateDrainRequestType, req) + require.Nil(err) + + resp := fsm.Apply(makeLog(buf)) + require.Nil(resp) + + // Verify we have upgraded to a force drain + ws := memdb.NewWatchSet() + node, err = fsm.State().NodeByID(ws, req.NodeID) + require.Nil(err) + require.True(node.Drain) + + expected := &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: -1 * time.Second, + }, + } + require.Equal(expected, node.DrainStrategy) +} + func TestFSM_UpdateNodeEligibility(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index d7d5844341e..6b7b90e2f40 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -891,10 +891,10 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) { // Check for the node in the FSM state := s1.fsm.State() - ws := memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) + require.True(out.Drain) require.Equal(strategy.Deadline, out.DrainStrategy.Deadline) require.Len(out.Events, 2) require.Equal(NodeDrainEventDrainSet, out.Events[1].Message) @@ -2682,7 +2682,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) { if resp2.Index != 3 { t.Fatalf("Bad index: %d %d", resp2.Index, 3) } - if len(resp2.Nodes) != 1 { + if len(resp2.Nodes) != 1 || !resp2.Nodes[0].Drain { t.Fatalf("bad: %#v", resp2.Nodes) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 7e3539f3ec1..d21e4bbdefb 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -935,6 +935,7 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) { for _, id := range []string{n1.ID, n2.ID} { out, err := state.NodeByID(ws, id) require.Nil(err) + require.True(out.Drain) require.NotNil(out.DrainStrategy) require.Equal(out.DrainStrategy, expectedDrain) require.Len(out.Events, 2) @@ -977,6 +978,7 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) { ws = memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) + require.True(out.Drain) require.NotNil(out.DrainStrategy) require.Equal(out.DrainStrategy, expectedDrain) require.Len(out.Events, 2) From 02d0b9274c28b66d10790f75fa3355d10483b3b1 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 1 Jul 2019 16:46:05 -0500 Subject: [PATCH 9/9] Missed one revert of backwards compatibility for node drain --- nomad/state/state_store.go | 1 + 1 file changed, 1 insertion(+) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6f4080d0ae4..9763d6c1231 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -792,6 +792,7 @@ func (s *StateStore) updateNodeDrainImpl(txn *memdb.Txn, index uint64, nodeID st } // Update the drain in the copy + copyNode.Drain = drain != nil // COMPAT: Remove in Nomad 0.10 copyNode.DrainStrategy = drain if drain != nil { copyNode.SchedulingEligibility = structs.NodeSchedulingIneligible