Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote agent pprof endpoints #6841

Merged
merged 21 commits into from
Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions api/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package api
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
"strconv"
)

// Agent encapsulates an API client which talks to Nomad's
Expand Down Expand Up @@ -288,6 +290,85 @@ func (a *Agent) Monitor(stopCh <-chan struct{}, q *QueryOptions) (<-chan *Stream
return frames, errCh
}

// PprofOptions contain a set of parameters for profiling a node or server.
type PprofOptions struct {
// ServerID is the server ID, name, or special value "leader" to
// specify the server that a given profile should be run on.
ServerID string

// NodeID is the node ID that a given profile should be run on.
NodeID string

// Seconds specifies the amount of time a profile should be run for.
// Seconds only applies for certain runtime profiles like CPU and Trace.
Seconds int

// GC determines if a runtime.GC() should be called before a heap
// profile.
GC int

// Debug specifies if the output of a lookup profile should be returned
// in human readable format instead of binary.
Debug int
}

// CPUProfile returns a runtime/pprof cpu profile for a given server or node.
// The profile will run for the amount of seconds passed in or default to 1.
// If no serverID or nodeID are provided the current Agents server will be
// used.
//
// The call blocks until the profile finishes, and returns the raw bytes of the
// profile.
func (a *Agent) CPUProfile(opts PprofOptions, q *QueryOptions) ([]byte, error) {
return a.pprofRequest("profile", opts, q)
}

// Trace returns a runtime/pprof trace for a given server or node.
// The trace will run for the amount of seconds passed in or default to 1.
// If no serverID or nodeID are provided the current Agents server will be
// used.
//
// The call blocks until the profile finishes, and returns the raw bytes of the
// profile.
func (a *Agent) Trace(opts PprofOptions, q *QueryOptions) ([]byte, error) {
return a.pprofRequest("trace", opts, q)
}

// Lookup returns a runtime/pprof profile using pprof.Lookup to determine
// which profile to run. Accepts a client or server ID but not both simultaneously.
//
// The call blocks until the profile finishes, and returns the raw bytes of the
// profile unless debug is set.
func (a *Agent) Lookup(profile string, opts PprofOptions, q *QueryOptions) ([]byte, error) {
return a.pprofRequest(profile, opts, q)
}

func (a *Agent) pprofRequest(req string, opts PprofOptions, q *QueryOptions) ([]byte, error) {
if q == nil {
drewbailey marked this conversation as resolved.
Show resolved Hide resolved
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}

q.Params["seconds"] = strconv.Itoa(opts.Seconds)
q.Params["debug"] = strconv.Itoa(opts.Debug)
q.Params["gc"] = strconv.Itoa(opts.GC)
q.Params["node_id"] = opts.NodeID
q.Params["server_id"] = opts.ServerID

body, err := a.client.rawQuery(fmt.Sprintf("/v1/agent/pprof/%s", req), q)
if err != nil {
return nil, err
}

resp, err := ioutil.ReadAll(body)
if err != nil {
return nil, err
}
return resp, nil
}

// joinResponse is used to decode the response we get while
// sending a member join request.
type joinResponse struct {
Expand Down
80 changes: 80 additions & 0 deletions api/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,3 +376,83 @@ OUTER:
}
}
}

func TestAgentCPUProfile(t *testing.T) {
t.Parallel()

c, s, token := makeACLClient(t, nil, nil)
defer s.Stop()

agent := c.Agent()

q := &QueryOptions{
AuthToken: token.SecretID,
}

// Valid local request
{
opts := PprofOptions{
Seconds: 1,
}
resp, err := agent.CPUProfile(opts, q)
require.NoError(t, err)
require.NotNil(t, resp)
}

// Invalid server request
{
opts := PprofOptions{
Seconds: 1,
ServerID: "unknown.global",
}
resp, err := agent.CPUProfile(opts, q)
require.Error(t, err)
require.Contains(t, err.Error(), "500 (unknown nomad server unknown.global)")
require.Nil(t, resp)
}

}

func TestAgentTrace(t *testing.T) {
t.Parallel()

c, s, token := makeACLClient(t, nil, nil)
defer s.Stop()

agent := c.Agent()

q := &QueryOptions{
AuthToken: token.SecretID,
}

resp, err := agent.Trace(PprofOptions{}, q)
require.NoError(t, err)
require.NotNil(t, resp)
}

func TestAgentProfile(t *testing.T) {
t.Parallel()

c, s, token := makeACLClient(t, nil, nil)
defer s.Stop()

agent := c.Agent()

q := &QueryOptions{
AuthToken: token.SecretID,
}

{
resp, err := agent.Lookup("heap", PprofOptions{}, q)
require.NoError(t, err)
require.NotNil(t, resp)
}

// unknown profile
{
resp, err := agent.Lookup("invalid", PprofOptions{}, q)
require.Error(t, err)
require.Contains(t, err.Error(), "Unexpected response code: 404")
require.Nil(t, resp)
}
}
61 changes: 55 additions & 6 deletions client/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/hashicorp/nomad/command/agent/monitor"
"github.com/hashicorp/nomad/command/agent/pprof"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
Expand All @@ -23,12 +24,59 @@ type Agent struct {
}

func NewAgentEndpoint(c *Client) *Agent {
m := &Agent{c: c}
m.c.streamingRpcs.Register("Agent.Monitor", m.monitor)
return m
a := &Agent{c: c}
a.c.streamingRpcs.Register("Agent.Monitor", a.monitor)
return a
}

func (m *Agent) monitor(conn io.ReadWriteCloser) {
func (a *Agent) Profile(args *structs.AgentPprofRequest, reply *structs.AgentPprofResponse) error {
// Check ACL for agent write
aclObj, err := a.c.ResolveToken(args.AuthToken)
if err != nil {
return err
} else if aclObj != nil && !aclObj.AllowAgentWrite() {
return structs.ErrPermissionDenied
}

// If ACLs are disabled, EnableDebug must be enabled
if aclObj == nil && !a.c.config.EnableDebug {
return structs.ErrPermissionDenied
}

var resp []byte
var headers map[string]string

// Determine which profile to run and generate profile.
// Blocks for args.Seconds
// Our RPC endpoints currently don't support context
// or request cancellation so stubbing with TODO
switch args.ReqType {
case pprof.CPUReq:
resp, headers, err = pprof.CPUProfile(context.TODO(), args.Seconds)
case pprof.CmdReq:
resp, headers, err = pprof.Cmdline()
case pprof.LookupReq:
resp, headers, err = pprof.Profile(args.Profile, args.Debug, args.GC)
case pprof.TraceReq:
resp, headers, err = pprof.Trace(context.TODO(), args.Seconds)
}

if err != nil {
if pprof.IsErrProfileNotFound(err) {
return structs.NewErrRPCCoded(404, err.Error())
}
return structs.NewErrRPCCoded(500, err.Error())
}

// Copy profile response to reply
reply.Payload = resp
reply.AgentID = a.c.NodeID()
reply.HTTPHeaders = headers

return nil
}

func (a *Agent) monitor(conn io.ReadWriteCloser) {
defer metrics.MeasureSince([]string{"client", "agent", "monitor"}, time.Now())
defer conn.Close()

Expand All @@ -43,7 +91,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
}

// Check acl
if aclObj, err := m.c.ResolveToken(args.AuthToken); err != nil {
if aclObj, err := a.c.ResolveToken(args.AuthToken); err != nil {
handleStreamResultError(err, helper.Int64ToPtr(403), encoder)
return
} else if aclObj != nil && !aclObj.AllowAgentRead() {
Expand All @@ -64,7 +112,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

monitor := monitor.New(512, m.c.logger, &log.LoggerOptions{
monitor := monitor.New(512, a.c.logger, &log.LoggerOptions{
JSONFormat: args.LogJSON,
Level: logLevel,
})
Expand All @@ -76,6 +124,7 @@ func (m *Agent) monitor(conn io.ReadWriteCloser) {

framer := sframer.NewStreamFramer(frames, 1*time.Second, 200*time.Millisecond, 1024)
framer.Run()

defer framer.Destroy()

// goroutine to detect remote side closing
Expand Down
Loading