Skip to content

Commit

Permalink
test allocation stats rpc forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahmood Ali committed Mar 16, 2020
1 parent 646df47 commit a421a84
Showing 1 changed file with 112 additions and 0 deletions.
112 changes: 112 additions & 0 deletions nomad/client_alloc_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net"
"net/rpc"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -1065,6 +1066,117 @@ func TestClientAllocations_Restart_ACL(t *testing.T) {
}
}

// TestAlloc_RPC_Forwarding asserts that non-streaming RPC task requests are forwarded
// to appropriate server or remote regions. Uses Stats as a method
func TestAlloc_RPC_Forwarding(t *testing.T) {
t.Parallel()

////// Nomad clusters topology - not specific to test
localServer, cleanupLS := TestServer(t, nil)
defer cleanupLS()

remoteServer, cleanupRS := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer cleanupRS()

remoteRegionServer, cleanupRRS := TestServer(t, func(c *Config) {
c.Region = "two"
})
defer cleanupRRS()

TestJoin(t, localServer, remoteServer)
TestJoin(t, localServer, remoteRegionServer)
testutil.WaitForLeader(t, localServer.RPC)
testutil.WaitForLeader(t, remoteServer.RPC)
testutil.WaitForLeader(t, remoteRegionServer.RPC)

c, cleanup := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{localServer.config.RPCAddr.String()}
})
defer cleanup()

// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {
nodes := remoteServer.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
require.NoError(t, err, "failed to have a client")
})

// Force remove the connection locally in case it exists
remoteServer.nodeConnsLock.Lock()
delete(remoteServer.nodeConns, c.NodeID())
remoteServer.nodeConnsLock.Unlock()

///// Start task
a := mock.BatchAlloc()
a.NodeID = c.NodeID()
a.Job.Type = structs.JobTypeBatch
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "30s",
}

// Upsert the allocation
localState := localServer.State()
require.Nil(t, localState.UpsertJob(999, a.Job))
require.Nil(t, localState.UpsertAllocs(1003, []*structs.Allocation{a}))
remoteState := remoteServer.State()
require.Nil(t, remoteState.UpsertJob(999, a.Job))
require.Nil(t, remoteState.UpsertAllocs(1003, []*structs.Allocation{a}))

// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := localState.AllocByID(nil, a.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
if alloc.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}

return true, nil
}, func(err error) {
require.NoError(t, err, "task didn't start yet")
})

///////// Actually run query now
cases := []struct {
name string
codec rpc.ClientCodec
}{
{"local_server", rpcClient(t, localServer)},
{"remote_server", rpcClient(t, remoteServer)},
{"remote_region", rpcClient(t, remoteRegionServer)},
}

taskName := a.Job.TaskGroups[0].Tasks[0].Name

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
req := &cstructs.AllocStatsRequest{
AllocID: a.ID,
Task: taskName,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}

var resp cstructs.AllocStatsResponse
err := msgpackrpc.CallWithCodec(tc.codec, "ClientAllocations.Stats", req, &resp)
require.NoError(t, err)

t.Logf("received stats: %#+v", resp.Stats)
require.NotNil(t, resp.Stats)
require.Contains(t, resp.Stats.Tasks, taskName)
})
}
}

// TestAlloc_ExecStreaming asserts that exec task requests are forwarded
// to appropriate server or remote regions
func TestAlloc_ExecStreaming(t *testing.T) {
Expand Down

0 comments on commit a421a84

Please sign in to comment.