Skip to content

Commit

Permalink
GetAllocs uses a blocking query
Browse files Browse the repository at this point in the history
This PR makes GetAllocs use a blocking query as well as adding a sanity
check to the clients watchAllocation code to ensure it gets the correct
allocations.

This PR fixes #2119 and
#2153.

The issue was that the client was talking to two different servers, one
to check which allocations to pull and the other to pull those
allocations.  However the latter call was not with a blocking query and
thus the client would not retreive the allocations it requested.

The logging has been improved to make the problem more clear as well.
  • Loading branch information
dadgar committed Jan 10, 2017
1 parent 35af1ed commit 925622a
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 35 deletions.
35 changes: 27 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,14 +1274,13 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
}
}

c.logger.Printf("[DEBUG] client: updated allocations at index %d (pulled %d) (filtered %d)",
resp.Index, len(pull), len(filtered))

// Pull the allocations that passed filtering.
allocsResp.Allocs = nil
var pulledAllocs map[string]*structs.Allocation
if len(pull) != 0 {
// Pull the allocations that need to be updated.
allocsReq.AllocIDs = pull
allocsReq.MinQueryIndex = resp.Index - 1
allocsResp = structs.AllocsGetResponse{}
if err := c.RPC("Alloc.GetAllocs", &allocsReq, &allocsResp); err != nil {
c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err)
Expand All @@ -1296,6 +1295,27 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
}
}

// Ensure that we received all the allocations we wanted
pulledAllocs = make(map[string]*structs.Allocation, len(allocsResp.Allocs))
for _, alloc := range allocsResp.Allocs {
pulledAllocs[alloc.ID] = alloc
}

for _, desiredID := range pull {
if _, ok := pulledAllocs[desiredID]; !ok {
// We didn't get everything we wanted. Do not update the
// MinQueryIndex, sleep and then retry.
select {
case <-time.After(2 * time.Second):
// Wait for the server we contact to receive the
// allocations
continue
case <-c.shutdownCh:
return
}
}
}

// Check for shutdown
select {
case <-c.shutdownCh:
Expand All @@ -1304,19 +1324,18 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
}
}

c.logger.Printf("[DEBUG] client: updated allocations at index %d (total %d) (pulled %d) (filtered %d)",
resp.Index, len(resp.Allocs), len(allocsResp.Allocs), len(filtered))

// Update the query index.
if resp.Index > req.MinQueryIndex {
req.MinQueryIndex = resp.Index
}

// Push the updates.
pulled := make(map[string]*structs.Allocation, len(allocsResp.Allocs))
for _, alloc := range allocsResp.Allocs {
pulled[alloc.ID] = alloc
}
update := &allocUpdates{
filtered: filtered,
pulled: pulled,
pulled: pulledAllocs,
}
select {
case updates <- update:
Expand Down
90 changes: 65 additions & 25 deletions nomad/alloc_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nomad

import (
"fmt"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -97,7 +96,7 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest,
if out != nil {
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
// Use the last index that affected the allocs table
index, err := snap.Index("allocs")
if err != nil {
return err
Expand All @@ -118,32 +117,73 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now())
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_allocs"}, time.Now())

// Lookup the allocations
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
// Build the watch
items := make([]watch.Item, 0, len(args.AllocIDs))
for _, allocID := range args.AllocIDs {
items = append(items, watch.Item{Alloc: allocID})
}

allocs := make([]*structs.Allocation, len(args.AllocIDs))
for i, alloc := range args.AllocIDs {
out, err := snap.AllocByID(alloc)
if err != nil {
return err
}
if out == nil {
return fmt.Errorf("unknown alloc id %q", alloc)
}

allocs[i] = out
if reply.Index < out.ModifyIndex {
reply.Index = out.ModifyIndex
}
}

// Set the response
a.srv.setQueryMeta(&reply.QueryMeta)
reply.Allocs = allocs
return nil
// Setup the blocking query. We wait for at least one of the requested
// allocations to be above the min query index. This guarantees that the
// server has received that index.
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(items...),
run: func() error {
// Lookup the allocation
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}

thresholdMet := false
maxIndex := uint64(0)
for i, alloc := range args.AllocIDs {
out, err := snap.AllocByID(alloc)
if err != nil {
return err
}
if out == nil {
// We don't have the alloc yet
thresholdMet = false
break
}

// Store the pointer
allocs[i] = out

// Check if we have passed the minimum index
if out.ModifyIndex > args.QueryOptions.MinQueryIndex {
thresholdMet = true
}

if maxIndex < out.ModifyIndex {
maxIndex = out.ModifyIndex
}
}

// Setup the output
if thresholdMet {
reply.Allocs = allocs
reply.Index = maxIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
}

// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
},
}
return a.srv.blockingRPC(&opts)
}
60 changes: 58 additions & 2 deletions nomad/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,10 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) {

// Lookup the allocs
get := &structs.AllocsGetRequest{
AllocIDs: []string{alloc.ID, alloc2.ID},
QueryOptions: structs.QueryOptions{Region: "global"},
AllocIDs: []string{alloc.ID, alloc2.ID},
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var resp structs.AllocsGetResponse
if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil {
Expand All @@ -272,3 +274,57 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) {
t.Fatalf("expect error")
}
}

func TestAllocEndpoint_GetAllocs_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the allocs
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()

// First create an unrelated alloc
time.AfterFunc(100*time.Millisecond, func() {
state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID))
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
})

// Create the alloc we are watching later
time.AfterFunc(200*time.Millisecond, func() {
state.UpsertJobSummary(199, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(200, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
})

// Lookup the allocs
get := &structs.AllocsGetRequest{
AllocIDs: []string{alloc1.ID, alloc2.ID},
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 150,
},
}
var resp structs.AllocsGetResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}

if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if len(resp.Allocs) != 2 {
t.Fatalf("bad: %#v", resp.Allocs)
}
}

0 comments on commit 925622a

Please sign in to comment.