Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetAllocs uses a blocking query #2177

Merged
merged 2 commits into from
Jan 11, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1274,14 +1274,13 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
}
}

c.logger.Printf("[DEBUG] client: updated allocations at index %d (pulled %d) (filtered %d)",
resp.Index, len(pull), len(filtered))

// Pull the allocations that passed filtering.
allocsResp.Allocs = nil
var pulledAllocs map[string]*structs.Allocation
if len(pull) != 0 {
// Pull the allocations that need to be updated.
allocsReq.AllocIDs = pull
allocsReq.MinQueryIndex = resp.Index - 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why -1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allocations I want are already at that index and MinQueryIndex waits for the index to be greater than the passed index.

allocsResp = structs.AllocsGetResponse{}
if err := c.RPC("Alloc.GetAllocs", &allocsReq, &allocsResp); err != nil {
c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err)
Expand All @@ -1296,6 +1295,27 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
}
}

// Ensure that we received all the allocations we wanted
pulledAllocs = make(map[string]*structs.Allocation, len(allocsResp.Allocs))
for _, alloc := range allocsResp.Allocs {
pulledAllocs[alloc.ID] = alloc
}

for _, desiredID := range pull {
if _, ok := pulledAllocs[desiredID]; !ok {
// We didn't get everything we wanted. Do not update the
// MinQueryIndex, sleep and then retry.
select {
case <-time.After(2 * time.Second):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a jitter here to prevent thundering hards across the cluster?

// Wait for the server we contact to receive the
// allocations
continue
case <-c.shutdownCh:
return
}
}
}

// Check for shutdown
select {
case <-c.shutdownCh:
Expand All @@ -1304,19 +1324,18 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
}
}

c.logger.Printf("[DEBUG] client: updated allocations at index %d (total %d) (pulled %d) (filtered %d)",
resp.Index, len(resp.Allocs), len(allocsResp.Allocs), len(filtered))

// Update the query index.
if resp.Index > req.MinQueryIndex {
req.MinQueryIndex = resp.Index
}

// Push the updates.
pulled := make(map[string]*structs.Allocation, len(allocsResp.Allocs))
for _, alloc := range allocsResp.Allocs {
pulled[alloc.ID] = alloc
}
update := &allocUpdates{
filtered: filtered,
pulled: pulled,
pulled: pulledAllocs,
}
select {
case updates <- update:
Expand Down
90 changes: 65 additions & 25 deletions nomad/alloc_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nomad

import (
"fmt"
"time"

"github.com/armon/go-metrics"
Expand Down Expand Up @@ -97,7 +96,7 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest,
if out != nil {
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
// Use the last index that affected the allocs table
index, err := snap.Index("allocs")
if err != nil {
return err
Expand All @@ -118,32 +117,73 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now())
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_allocs"}, time.Now())

// Lookup the allocations
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
// Build the watch
items := make([]watch.Item, 0, len(args.AllocIDs))
for _, allocID := range args.AllocIDs {
items = append(items, watch.Item{Alloc: allocID})
}

allocs := make([]*structs.Allocation, len(args.AllocIDs))
for i, alloc := range args.AllocIDs {
out, err := snap.AllocByID(alloc)
if err != nil {
return err
}
if out == nil {
return fmt.Errorf("unknown alloc id %q", alloc)
}

allocs[i] = out
if reply.Index < out.ModifyIndex {
reply.Index = out.ModifyIndex
}
}

// Set the response
a.srv.setQueryMeta(&reply.QueryMeta)
reply.Allocs = allocs
return nil
// Setup the blocking query. We wait for at least one of the requested
// allocations to be above the min query index. This guarantees that the
// server has received that index.
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(items...),
run: func() error {
// Lookup the allocation
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}

thresholdMet := false
maxIndex := uint64(0)
for i, alloc := range args.AllocIDs {
out, err := snap.AllocByID(alloc)
if err != nil {
return err
}
if out == nil {
// We don't have the alloc yet
thresholdMet = false
break
}

// Store the pointer
allocs[i] = out

// Check if we have passed the minimum index
if out.ModifyIndex > args.QueryOptions.MinQueryIndex {
thresholdMet = true
}

if maxIndex < out.ModifyIndex {
maxIndex = out.ModifyIndex
}
}

// Setup the output
if thresholdMet {
reply.Allocs = allocs
reply.Index = maxIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
}

// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
},
}
return a.srv.blockingRPC(&opts)
}
60 changes: 58 additions & 2 deletions nomad/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,10 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) {

// Lookup the allocs
get := &structs.AllocsGetRequest{
AllocIDs: []string{alloc.ID, alloc2.ID},
QueryOptions: structs.QueryOptions{Region: "global"},
AllocIDs: []string{alloc.ID, alloc2.ID},
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var resp structs.AllocsGetResponse
if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil {
Expand All @@ -272,3 +274,57 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) {
t.Fatalf("expect error")
}
}

func TestAllocEndpoint_GetAllocs_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the allocs
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()

// First create an unrelated alloc
time.AfterFunc(100*time.Millisecond, func() {
state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID))
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
})

// Create the alloc we are watching later
time.AfterFunc(200*time.Millisecond, func() {
state.UpsertJobSummary(199, mock.JobSummary(alloc2.JobID))
err := state.UpsertAllocs(200, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
})

// Lookup the allocs
get := &structs.AllocsGetRequest{
AllocIDs: []string{alloc1.ID, alloc2.ID},
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 150,
},
}
var resp structs.AllocsGetResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}

if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if len(resp.Allocs) != 2 {
t.Fatalf("bad: %#v", resp.Allocs)
}
}