Skip to content

Commit

Permalink
Merge pull request #366 from hashicorp/f-blocking
Browse files Browse the repository at this point in the history
Support blocking queries
  • Loading branch information
ryanuber committed Nov 4, 2015
2 parents 1870f9b + 2a1577e commit 7f63be4
Show file tree
Hide file tree
Showing 13 changed files with 1,609 additions and 446 deletions.
119 changes: 69 additions & 50 deletions nomad/alloc_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
)

// Alloc endpoint is used for manipulating allocations
Expand All @@ -19,35 +20,45 @@ func (a *Alloc) List(args *structs.AllocListRequest, reply *structs.AllocListRes
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "list"}, time.Now())

// Capture all the allocations
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
iter, err := snap.Allocs()
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Table: "allocs"}),
run: func() error {
// Capture all the allocations
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
iter, err := snap.Allocs()
if err != nil {
return err
}

for {
raw := iter.Next()
if raw == nil {
break
}
alloc := raw.(*structs.Allocation)
reply.Allocations = append(reply.Allocations, alloc.Stub())
}
var allocs []*structs.AllocListStub
for {
raw := iter.Next()
if raw == nil {
break
}
alloc := raw.(*structs.Allocation)
allocs = append(allocs, alloc.Stub())
}
reply.Allocations = allocs

// Use the last index that affected the jobs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
// Use the last index that affected the jobs table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index

// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return a.srv.blockingRPC(&opts)
}

// GetAlloc is used to lookup a particular allocation
Expand All @@ -58,30 +69,38 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest,
}
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now())

// Lookup the allocation
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.AllocByID(args.AllocID)
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Alloc: args.AllocID}),
run: func() error {
// Lookup the allocation
snap, err := a.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.AllocByID(args.AllocID)
if err != nil {
return err
}

// Setup the output
if out != nil {
reply.Alloc = out
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
}
// Setup the output
reply.Alloc = out
if out != nil {
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("allocs")
if err != nil {
return err
}
reply.Index = index
}

// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Set the query response
a.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return a.srv.blockingRPC(&opts)
}
121 changes: 121 additions & 0 deletions nomad/alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nomad
import (
"reflect"
"testing"
"time"

"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/mock"
Expand Down Expand Up @@ -44,6 +45,74 @@ func TestAllocEndpoint_List(t *testing.T) {
}
}

func TestAllocEndpoint_List_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the alloc
alloc := mock.Alloc()

// Upsert alloc triggers watches
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertAllocs(2, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
})

req := &structs.AllocListRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 1,
},
}
start := time.Now()
var resp structs.AllocListResponse
if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}

if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 2 {
t.Fatalf("Bad index: %d %d", resp.Index, 2)
}
if len(resp.Allocations) != 1 || resp.Allocations[0].ID != alloc.ID {
t.Fatalf("bad: %#v", resp.Allocations)
}

// Client updates trigger watches
alloc2 := mock.Alloc()
alloc2.ID = alloc.ID
alloc2.ClientStatus = structs.AllocClientStatusRunning
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpdateAllocFromClient(3, alloc2); err != nil {
t.Fatalf("err: %v", err)
}
})

req.MinQueryIndex = 2
start = time.Now()
var resp2 structs.AllocListResponse
if err := msgpackrpc.CallWithCodec(codec, "Alloc.List", req, &resp2); err != nil {
t.Fatalf("err: %v", err)
}

if elapsed := time.Since(start); elapsed < 100*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp2)
}
if resp2.Index != 3 {
t.Fatalf("Bad index: %d %d", resp2.Index, 3)
}
if len(resp2.Allocations) != 1 || resp.Allocations[0].ID != alloc.ID ||
resp2.Allocations[0].ClientStatus != structs.AllocClientStatusRunning {
t.Fatalf("bad: %#v", resp2.Allocations)
}
}

func TestAllocEndpoint_GetAlloc(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
Expand Down Expand Up @@ -75,3 +144,55 @@ func TestAllocEndpoint_GetAlloc(t *testing.T) {
t.Fatalf("bad: %#v", resp.Alloc)
}
}

func TestAllocEndpoint_GetAlloc_Blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the allocs
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()

// First create an unrelated alloc
time.AfterFunc(100*time.Millisecond, func() {
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
})

// Create the alloc we are watching later
time.AfterFunc(200*time.Millisecond, func() {
err := state.UpsertAllocs(200, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
})

// Lookup the jobs
get := &structs.AllocSpecificRequest{
AllocID: alloc2.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 50,
},
}
var resp structs.SingleAllocResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAlloc", get, &resp); err != nil {
t.Fatalf("err: %v", err)
}

if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 200 {
t.Fatalf("Bad index: %d %d", resp.Index, 200)
}
if resp.Alloc == nil || resp.Alloc.ID != alloc2.ID {
t.Fatalf("bad: %#v", resp.Alloc)
}
}
Loading

0 comments on commit 7f63be4

Please sign in to comment.