From 410ae593e78617144ad475cc2a39a065af924331 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 1 Feb 2016 13:57:35 -0800 Subject: [PATCH] Fix double pull with introduction of AllocModifyIndex --- api/nodes.go | 12 --- api/nodes_test.go | 18 ----- client/alloc_runner.go | 18 ++++- client/alloc_runner_test.go | 19 ++--- client/client.go | 107 ++++++++++++++++++++++---- client/client_test.go | 10 ++- client/util.go | 31 ++++---- client/util_test.go | 16 ++-- command/agent/node_endpoint.go | 24 ------ command/agent/node_endpoint_test.go | 54 ------------- nomad/alloc_endpoint.go | 37 +++++++++ nomad/alloc_endpoint_test.go | 44 ++++++++++- nomad/fsm_test.go | 1 + nomad/node_endpoint.go | 4 +- nomad/node_endpoint_test.go | 2 +- nomad/state/state_store.go | 2 + nomad/structs/structs.go | 17 +++- website/source/docs/http/node.html.md | 38 --------- 18 files changed, 247 insertions(+), 207 deletions(-) diff --git a/api/nodes.go b/api/nodes.go index 44fa7ecc8f7..7d4e6dba0fb 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -61,18 +61,6 @@ func (n *Nodes) Allocations(nodeID string, q *QueryOptions) ([]*Allocation, *Que return resp, qm, nil } -// ClientAllocations is used to return a lightweight list of allocations associated with a node. -// It is primarily used by the client in order to determine which allocations actually need -// an update. -func (n *Nodes) ClientAllocations(nodeID string, q *QueryOptions) (map[string]uint64, *QueryMeta, error) { - var resp map[string]uint64 - qm, err := n.client.query("/v1/node/"+nodeID+"/clientallocations", &resp, q) - if err != nil { - return nil, nil, err - } - return resp, qm, nil -} - // ForceEvaluate is used to force-evaluate an existing node. func (n *Nodes) ForceEvaluate(nodeID string, q *WriteOptions) (string, *WriteMeta, error) { var resp nodeEvalResponse diff --git a/api/nodes_test.go b/api/nodes_test.go index 81fb30db7eb..0a57321763a 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -207,24 +207,6 @@ func TestNodes_Allocations(t *testing.T) { } } -func TestNodes_ClientAllocations(t *testing.T) { - c, s := makeClient(t, nil, nil) - defer s.Stop() - nodes := c.Nodes() - - // Looking up by a non-existent node returns nothing. We - // don't check the index here because it's possible the node - // has already registered, in which case we will get a non- - // zero result anyways. - allocs, _, err := nodes.ClientAllocations("nope", nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if n := len(allocs); n != 0 { - t.Fatalf("expected 0 allocs, got: %d", n) - } -} - func TestNodes_ForceEvaluate(t *testing.T) { c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) { c.DevMode = true diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 71de749c78d..f17a420b4d3 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -38,7 +38,8 @@ type AllocRunner struct { logger *log.Logger consulService *ConsulService - alloc *structs.Allocation + alloc *structs.Allocation + allocLock sync.Mutex dirtyCh chan struct{} @@ -184,6 +185,8 @@ func (r *AllocRunner) DestroyContext() error { // Alloc returns the associated allocation func (r *AllocRunner) Alloc() *structs.Allocation { + r.allocLock.Lock() + defer r.allocLock.Unlock() return r.alloc } @@ -336,6 +339,11 @@ OUTER: for { select { case update := <-r.updateCh: + // Store the updated allocation. + r.allocLock.Lock() + r.alloc = update + r.allocLock.Unlock() + // Check if we're in a terminal status if update.TerminalStatus() { break OUTER @@ -408,6 +416,14 @@ func (r *AllocRunner) Update(update *structs.Allocation) { } } +// shouldUpdate takes the AllocModifyIndex of an allocation sent from the server and +// checks if the current running allocation is behind and should be updated. +func (r *AllocRunner) shouldUpdate(serverIndex uint64) bool { + r.allocLock.Lock() + defer r.allocLock.Unlock() + return r.alloc.AllocModifyIndex < serverIndex +} + // Destroy is used to indicate that the allocation context should be destroyed func (r *AllocRunner) Destroy() { r.destroyLock.Lock() diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 3d6c203bd1a..80a96685986 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -91,7 +91,7 @@ func TestAllocRunner_Destroy(t *testing.T) { func TestAllocRunner_Update(t *testing.T) { ctestutil.ExecCompatible(t) - upd, ar := testAllocRunner(false) + _, ar := testAllocRunner(false) // Ensure task takes some time task := ar.alloc.Job.TaskGroups[0].Tasks[0] @@ -99,27 +99,20 @@ func TestAllocRunner_Update(t *testing.T) { task.Config["args"] = []string{"10"} go ar.Run() defer ar.Destroy() - start := time.Now() // Update the alloc definition newAlloc := new(structs.Allocation) *newAlloc = *ar.alloc - newAlloc.DesiredStatus = structs.AllocDesiredStatusStop + newAlloc.Name = "FOO" + newAlloc.AllocModifyIndex++ ar.Update(newAlloc) + // Check the alloc runner stores the update allocation. testutil.WaitForResult(func() (bool, error) { - if upd.Count == 0 { - return false, nil - } - last := upd.Allocs[upd.Count-1] - return last.ClientStatus == structs.AllocClientStatusDead, nil + return ar.Alloc().Name == "FOO", nil }, func(err error) { - t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) + t.Fatalf("err: %v %#v", err, ar.Alloc()) }) - - if time.Since(start) > 15*time.Second { - t.Fatalf("took too long to terminate") - } } func TestAllocRunner_SaveRestoreState(t *testing.T) { diff --git a/client/client.go b/client/client.go index 33f5efde9fc..c0b7cb70338 100644 --- a/client/client.go +++ b/client/client.go @@ -627,7 +627,7 @@ func (c *Client) run() { } // Watch for changes in allocations - allocUpdates := make(chan []*structs.Allocation, 1) + allocUpdates := make(chan *allocUpdates, 1) go c.watchAllocations(allocUpdates) // Create a snapshot timer @@ -642,8 +642,8 @@ func (c *Client) run() { c.logger.Printf("[ERR] client: failed to save state: %v", err) } - case allocs := <-allocUpdates: - c.runAllocs(allocs) + case update := <-allocUpdates: + c.runAllocs(update) case <-heartbeat: if err := c.updateNodeStatus(); err != nil { @@ -722,8 +722,22 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) error { return nil } +// allocUpdates holds the results of receiving updated allocations from the +// servers. +type allocUpdates struct { + // pulled is the set of allocations that were downloaded from the servers. + pulled map[string]*structs.Allocation + + // filtered is the set of allocations that were not pulled because their + // AllocModifyIndex didn't change. + filtered map[string]struct{} +} + // watchAllocations is used to scan for updates to allocations -func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) { +func (c *Client) watchAllocations(updates chan *allocUpdates) { + // The request and response for getting the map of allocations that should + // be running on the Node to their AllocModifyIndex which is incremented + // when the allocation is updated by the servers. req := structs.NodeSpecificRequest{ NodeID: c.Node().ID, QueryOptions: structs.QueryOptions{ @@ -731,12 +745,24 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) { AllowStale: true, }, } - var resp structs.NodeAllocsResponse + var resp structs.NodeClientAllocsResponse + + // The request and response for pulling down the set of allocations that are + // new, or updated server side. + allocsReq := structs.AllocsGetRequest{ + QueryOptions: structs.QueryOptions{ + Region: c.config.Region, + AllowStale: true, + }, + } + var allocsResp structs.AllocsGetResponse for { - // Get the allocations, blocking for updates - resp = structs.NodeAllocsResponse{} - err := c.RPC("Node.GetAllocs", &req, &resp) + // Get the allocation modify index map, blocking for updates. We will + // use this to determine exactly what allocations need to be downloaded + // in full. + resp = structs.NodeClientAllocsResponse{} + err := c.RPC("Node.GetClientAllocs", &req, &resp) if err != nil { c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err) retry := c.retryIntv(getAllocRetryIntv) @@ -755,16 +781,69 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) { default: } - // Check for updates + // Filter all allocations whose AllocModifyIndex was not incremented. + // These are the allocations who have either not been updated, or whose + // updates are a result of the client sending an update for the alloc. + // This lets us reduce the network traffic to the server as we don't + // need to pull all the allocations. + var pull []string + filtered := make(map[string]struct{}) + c.allocLock.Lock() + for allocID, modifyIndex := range resp.Allocs { + // Pull the allocation if we don't have an alloc runner for the + // allocation or if the alloc runner requires an updated allocation. + runner, ok := c.allocs[allocID] + if !ok || runner.shouldUpdate(modifyIndex) { + pull = append(pull, allocID) + } + filtered[allocID] = struct{}{} + } + c.allocLock.Unlock() + c.logger.Printf("[DEBUG] client: updated allocations at index %d (pulled %d) (filtered %d)", + resp.Index, len(pull), len(filtered)) + + // Pull the allocations that passed filtering. + allocsResp.Allocs = nil + if len(pull) != 0 { + // Pull the allocations that need to be updated. + allocsReq.AllocIDs = pull + allocsResp = structs.AllocsGetResponse{} + if err := c.RPC("Alloc.GetAllocs", &allocsReq, &allocsResp); err != nil { + c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err) + retry := c.retryIntv(getAllocRetryIntv) + select { + case <-time.After(retry): + continue + case <-c.shutdownCh: + return + } + } + + // Check for shutdown + select { + case <-c.shutdownCh: + return + default: + } + } + + // Update the query index. if resp.Index <= req.MinQueryIndex { continue } req.MinQueryIndex = resp.Index - c.logger.Printf("[DEBUG] client: updated allocations at index %d (%d allocs)", resp.Index, len(resp.Allocs)) - // Push the updates + // Push the updates. + pulled := make(map[string]*structs.Allocation, len(allocsResp.Allocs)) + for _, alloc := range allocsResp.Allocs { + pulled[alloc.ID] = alloc + } + update := &allocUpdates{ + filtered: filtered, + pulled: pulled, + } select { - case allocUpdates <- resp.Allocs: + case updates <- update: case <-c.shutdownCh: return } @@ -772,7 +851,7 @@ func (c *Client) watchAllocations(allocUpdates chan []*structs.Allocation) { } // runAllocs is invoked when we get an updated set of allocations -func (c *Client) runAllocs(updated []*structs.Allocation) { +func (c *Client) runAllocs(update *allocUpdates) { // Get the existing allocs c.allocLock.RLock() exist := make([]*structs.Allocation, 0, len(c.allocs)) @@ -782,7 +861,7 @@ func (c *Client) runAllocs(updated []*structs.Allocation) { c.allocLock.RUnlock() // Diff the existing and updated allocations - diff := diffAllocs(exist, updated) + diff := diffAllocs(exist, update) c.logger.Printf("[DEBUG] client: %#v", diff) // Remove the old allocations diff --git a/client/client_test.go b/client/client_test.go index 9e94542c188..d1b7bc87a02 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -355,10 +355,14 @@ func TestClient_WatchAllocs(t *testing.T) { t.Fatalf("err: %v", err) } - // Update the other allocation - alloc2.DesiredStatus = structs.AllocDesiredStatusStop + // Update the other allocation. Have to make a copy because the allocs are + // shared in memory in the test and the modify index would be updated in the + // alloc runner. + alloc2_2 := new(structs.Allocation) + *alloc2_2 = *alloc2 + alloc2_2.DesiredStatus = structs.AllocDesiredStatusStop err = state.UpsertAllocs(102, - []*structs.Allocation{alloc2}) + []*structs.Allocation{alloc2_2}) if err != nil { t.Fatalf("err: %v", err) } diff --git a/client/util.go b/client/util.go index 4efec9e184e..a8afcd17152 100644 --- a/client/util.go +++ b/client/util.go @@ -31,33 +31,28 @@ func (d *diffResult) GoString() string { // diffAllocs is used to diff the existing and updated allocations // to see what has happened. -func diffAllocs(existing, updated []*structs.Allocation) *diffResult { - result := &diffResult{} - - // Index the updated allocations by id - idx := make(map[string]*structs.Allocation) - for _, update := range updated { - idx[update.ID] = update - } - +func diffAllocs(existing []*structs.Allocation, allocs *allocUpdates) *diffResult { // Scan the existing allocations + result := &diffResult{} existIdx := make(map[string]struct{}) for _, exist := range existing { // Mark this as existing existIdx[exist.ID] = struct{}{} - // Check for presence in the new set - update, ok := idx[exist.ID] + // Check if the alloc was updated or filtered because an update wasn't + // needed. + alloc, pulled := allocs.pulled[exist.ID] + _, filtered := allocs.filtered[exist.ID] - // If not present, removed - if !ok { + // If not updated or filtered, removed + if !pulled && !filtered { result.removed = append(result.removed, exist) continue } // Check for an update - if update.ModifyIndex > exist.ModifyIndex { - result.updated = append(result.updated, allocTuple{exist, update}) + if pulled && alloc.AllocModifyIndex > exist.AllocModifyIndex { + result.updated = append(result.updated, allocTuple{exist, alloc}) continue } @@ -66,9 +61,9 @@ func diffAllocs(existing, updated []*structs.Allocation) *diffResult { } // Scan the updated allocations for any that are new - for _, update := range updated { - if _, ok := existIdx[update.ID]; !ok { - result.added = append(result.added, update) + for id, pulled := range allocs.pulled { + if _, ok := existIdx[id]; !ok { + result.added = append(result.added, pulled) } } return result diff --git a/client/util_test.go b/client/util_test.go index d6c165b9b9c..64413df9aff 100644 --- a/client/util_test.go +++ b/client/util_test.go @@ -17,7 +17,7 @@ func TestDiffAllocs(t *testing.T) { alloc2 := mock.Alloc() // Update alloc2u := new(structs.Allocation) *alloc2u = *alloc2 - alloc2u.ModifyIndex += 1 + alloc2u.AllocModifyIndex += 1 alloc3 := mock.Alloc() // Remove alloc4 := mock.Alloc() // Add @@ -26,13 +26,17 @@ func TestDiffAllocs(t *testing.T) { alloc2, alloc3, } - updated := []*structs.Allocation{ - alloc1, - alloc2u, - alloc4, + update := &allocUpdates{ + pulled: map[string]*structs.Allocation{ + alloc2u.ID: alloc2u, + alloc4.ID: alloc4, + }, + filtered: map[string]struct{}{ + alloc1.ID: struct{}{}, + }, } - result := diffAllocs(exist, updated) + result := diffAllocs(exist, update) if len(result.ignore) != 1 || result.ignore[0] != alloc1 { t.Fatalf("Bad: %#v", result.ignore) diff --git a/command/agent/node_endpoint.go b/command/agent/node_endpoint.go index f249212c355..c872912e06e 100644 --- a/command/agent/node_endpoint.go +++ b/command/agent/node_endpoint.go @@ -36,9 +36,6 @@ func (s *HTTPServer) NodeSpecificRequest(resp http.ResponseWriter, req *http.Req case strings.HasSuffix(path, "/evaluate"): nodeName := strings.TrimSuffix(path, "/evaluate") return s.nodeForceEvaluate(resp, req, nodeName) - case strings.HasSuffix(path, "/clientallocations"): - nodeName := strings.TrimSuffix(path, "/clientallocations") - return s.nodeClientAllocations(resp, req, nodeName) case strings.HasSuffix(path, "/allocations"): nodeName := strings.TrimSuffix(path, "/allocations") return s.nodeAllocations(resp, req, nodeName) @@ -92,27 +89,6 @@ func (s *HTTPServer) nodeAllocations(resp http.ResponseWriter, req *http.Request return out.Allocs, nil } -func (s *HTTPServer) nodeClientAllocations(resp http.ResponseWriter, req *http.Request, - nodeID string) (interface{}, error) { - if req.Method != "GET" { - return nil, CodedError(405, ErrInvalidMethod) - } - args := structs.NodeSpecificRequest{ - NodeID: nodeID, - } - if s.parse(resp, req, &args.Region, &args.QueryOptions) { - return nil, nil - } - - var out structs.NodeClientAllocsResponse - if err := s.agent.RPC("Node.GetClientAllocs", &args, &out); err != nil { - return nil, err - } - - setMeta(resp, &out.QueryMeta) - return out.Allocs, nil -} - func (s *HTTPServer) nodeToggleDrain(resp http.ResponseWriter, req *http.Request, nodeID string) (interface{}, error) { if req.Method != "PUT" && req.Method != "POST" { diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index d4ec061b9a2..a63739a18bf 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -214,60 +214,6 @@ func TestHTTP_NodeAllocations(t *testing.T) { }) } -func TestHTTP_NodeClientAllocations(t *testing.T) { - httpTest(t, nil, func(s *TestServer) { - // Create the job - node := mock.Node() - args := structs.NodeRegisterRequest{ - Node: node, - WriteRequest: structs.WriteRequest{Region: "global"}, - } - var resp structs.NodeUpdateResponse - if err := s.Agent.RPC("Node.Register", &args, &resp); err != nil { - t.Fatalf("err: %v", err) - } - - // Directly manipulate the state - state := s.Agent.server.State() - alloc1 := mock.Alloc() - alloc1.NodeID = node.ID - err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1}) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Make the HTTP request - req, err := http.NewRequest("GET", "/v1/node/"+node.ID+"/clientallocations", nil) - if err != nil { - t.Fatalf("err: %v", err) - } - respW := httptest.NewRecorder() - - // Make the request - obj, err := s.Server.NodeSpecificRequest(respW, req) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Check for the index - if respW.HeaderMap.Get("X-Nomad-Index") == "" { - t.Fatalf("missing index") - } - if respW.HeaderMap.Get("X-Nomad-KnownLeader") != "true" { - t.Fatalf("missing known leader") - } - if respW.HeaderMap.Get("X-Nomad-LastContact") == "" { - t.Fatalf("missing last contact") - } - - // Check the node - allocs := obj.(map[string]uint64) - if len(allocs) != 1 || allocs[alloc1.ID] != 1000 { - t.Fatalf("bad: %#v", allocs) - } - }) -} - func TestHTTP_NodeDrain(t *testing.T) { httpTest(t, nil, func(s *TestServer) { // Create the node diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index ca65621b18b..e49e2383083 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -1,6 +1,7 @@ package nomad import ( + "fmt" "time" "github.com/armon/go-metrics" @@ -110,3 +111,39 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest, }} return a.srv.blockingRPC(&opts) } + +// GetAllocs is used to lookup a set of allocations +func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest, + reply *structs.AllocsGetResponse) error { + if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now()) + + // Lookup the allocations + snap, err := a.srv.fsm.State().Snapshot() + if err != nil { + return err + } + + allocs := make([]*structs.Allocation, len(args.AllocIDs)) + for i, alloc := range args.AllocIDs { + out, err := snap.AllocByID(alloc) + if err != nil { + return err + } + if out == nil { + return fmt.Errorf("unknown alloc id %q", alloc) + } + + allocs[i] = out + if reply.Index < out.ModifyIndex { + reply.Index = out.ModifyIndex + } + } + + // Set the response + a.srv.setQueryMeta(&reply.QueryMeta) + reply.Allocs = allocs + return nil +} diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index 2f82ad6de35..b6ea20cd6a3 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -192,7 +192,7 @@ func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) { } }) - // Lookup the jobs + // Lookup the allocs get := &structs.AllocSpecificRequest{ AllocID: alloc2.ID, QueryOptions: structs.QueryOptions{ @@ -216,3 +216,45 @@ func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) { t.Fatalf("bad: %#v", resp.Alloc) } } + +func TestAllocEndpoint_GetAllocs(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + alloc := mock.Alloc() + alloc2 := mock.Alloc() + state := s1.fsm.State() + err := state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Lookup the allocs + get := &structs.AllocsGetRequest{ + AllocIDs: []string{alloc.ID, alloc2.ID}, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp structs.AllocsGetResponse + if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index != 1000 { + t.Fatalf("Bad index: %d %d", resp.Index, 1000) + } + + if len(resp.Allocs) != 2 { + t.Fatalf("bad: %#v", resp.Allocs) + } + + // Lookup non-existent allocs. + get = &structs.AllocsGetRequest{ + AllocIDs: []string{"foo"}, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err == nil { + t.Fatalf("expect error") + } +} diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index ef76486a842..d9d7de61173 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -421,6 +421,7 @@ func TestFSM_UpsertAllocs(t *testing.T) { } alloc.CreateIndex = out.CreateIndex alloc.ModifyIndex = out.ModifyIndex + alloc.AllocModifyIndex = out.AllocModifyIndex if !reflect.DeepEqual(alloc, out) { t.Fatalf("bad: %#v %#v", alloc, out) } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 94ff3d4fca4..a56d842f372 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -387,7 +387,7 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest, return n.srv.blockingRPC(&opts) } -// GetClientAllocs is used to request a lightweight list of modify indexes +// GetClientAllocs is used to request a lightweight list of alloc modify indexes // per allocation. func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, reply *structs.NodeClientAllocsResponse) error { @@ -421,7 +421,7 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, // Setup the output if len(allocs) != 0 { for _, alloc := range allocs { - reply.Allocs[alloc.ID] = alloc.ModifyIndex + reply.Allocs[alloc.ID] = alloc.AllocModifyIndex reply.Index = maxUint64(reply.Index, alloc.ModifyIndex) } } else { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 743e6127ed2..7b2aa720aa9 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -666,7 +666,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) { allocUpdate.NodeID = alloc.NodeID allocUpdate.ID = alloc.ID allocUpdate.ClientStatus = structs.AllocClientStatusRunning - err := state.UpdateAllocFromClient(200, allocUpdate) + err := state.UpsertAllocs(200, []*structs.Allocation{allocUpdate}) if err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 553bcbf0777..d097bb0d7cf 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -793,10 +793,12 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er if existing == nil { alloc.CreateIndex = index alloc.ModifyIndex = index + alloc.AllocModifyIndex = index } else { exist := existing.(*structs.Allocation) alloc.CreateIndex = exist.CreateIndex alloc.ModifyIndex = index + alloc.AllocModifyIndex = index alloc.ClientStatus = exist.ClientStatus alloc.ClientDescription = exist.ClientDescription } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index fa7ac8bfc84..1e92c7d0334 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -272,6 +272,12 @@ type AllocSpecificRequest struct { QueryOptions } +// AllocsGetcRequest is used to query a set of allocations +type AllocsGetRequest struct { + AllocIDs []string + QueryOptions +} + // PeriodicForceReqeuest is used to force a specific periodic job. type PeriodicForceRequest struct { JobID string @@ -378,6 +384,12 @@ type SingleAllocResponse struct { QueryMeta } +// AllocsGetResponse is used to return a set of allocations +type AllocsGetResponse struct { + Allocs []*Allocation + QueryMeta +} + // JobAllocationsResponse is used to return the allocations for a job type JobAllocationsResponse struct { Allocations []*AllocListStub @@ -1647,8 +1659,9 @@ type Allocation struct { TaskStates map[string]*TaskState // Raft Indexes - CreateIndex uint64 - ModifyIndex uint64 + CreateIndex uint64 + ModifyIndex uint64 + AllocModifyIndex uint64 } // TerminalStatus returns if the desired or actual status is terminal and diff --git a/website/source/docs/http/node.html.md b/website/source/docs/http/node.html.md index 81c7fa137f3..794e0283abe 100644 --- a/website/source/docs/http/node.html.md +++ b/website/source/docs/http/node.html.md @@ -311,44 +311,6 @@ be specified using the `?region=` query parameter. -
-
Description
-
- Query the allocations belonging to a single node. This endpoint only returns - a map from allocation id to its modify index and is primarily used by the client - to determine which allocations need to be updated. -
- -
Method
-
GET
- -
URL
-
`/v1/node//clientallocations`
- -
Parameters
-
- None -
- -
Blocking Queries
-
- [Supported](/docs/http/index.html#blocking-queries) -
- -
Returns
-
- - ```javascript - { - "d66ea8d7-1d4c-119e-46b3-e23713a4ab72": 9, - "abf34c35-1d4c-119e-46b3-e23713a4ab72": 10 - } - ``` - -
-
- - ## PUT / POST