Skip to content

Commit

Permalink
add compoonent-level diagnostics (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
fearful-symmetry authored Jul 20, 2023
1 parent 8eee171 commit 10662ba
Show file tree
Hide file tree
Showing 12 changed files with 566 additions and 273 deletions.
13 changes: 13 additions & 0 deletions elastic-agent-client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,16 @@ message ActionRequest {
// Defined diagnostics action.
DIAGNOSTICS = 1;
}
// The level that the action is operating on.
// Currently only used for diagnostics.
enum Level {
// All diagnostics
ALL = 0;
// Component level action
COMPONENT = 1;
// Unit level action
UNIT = 2;
}
// Unique ID of the action.
string id = 1;
// Name of the action (name is ignored for DIAGNOSTICS).
Expand All @@ -273,6 +283,9 @@ message ActionRequest {
UnitType unit_type = 5;
// Type of action to be performed (only used with V2).
Type type = 6;
// Level marks the action as either operating on a component, or a unit.
// If level=component, then the consumer should ignore the unit_id and unit_type fields.
Level level = 7;
}

message ActionDiagnosticUnitResult {
Expand Down
132 changes: 100 additions & 32 deletions pkg/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ type V2 interface {
// a specific unit. Registering the hook at the client level means it will be called for every unit that has
// diagnostics requested.
RegisterDiagnosticHook(name string, description string, filename string, contentType string, hook DiagnosticHook)
// RegisterOptionalDiagnosticHook is the same as RegisterDiagnosticHook, but marks a given callback as optional, and tied to a given
// paramater tag that is specified in the `params` field of the V2 Action request. The diagnostic will not be run unless the
// diagnostic action event contains that paramiter tag. See ActionRequest in elastic-agent-client.proto and DiagnosticParams in
// diagnostics.go
RegisterOptionalDiagnosticHook(paramTag string, name string, description string, filename string, contentType string, hook DiagnosticHook)
}

// clientV2 manages the state and communication to the Elastic Agent over the V2 control protocol.
Expand Down Expand Up @@ -283,10 +288,27 @@ func (c *clientV2) RegisterDiagnosticHook(name string, description string, filen
c.dmx.Lock()
defer c.dmx.Unlock()
c.diagHooks[name] = diagHook{
description: description,
filename: filename,
contentType: contentType,
hook: hook,
description: description,
filename: filename,
contentType: contentType,
hook: hook,
optionalWithParamTag: "",
}
}

// RegisterOptionalDiagnosticHook is the same as RegisterDiagnosticHook, but marks a given callback as optional, and tied to a given
// paramater tag that is specified in the `params` field of the V2 Action request. The diagnostic will not be run unless the
// diagnostic action event contains that paramiter tag. See ActionRequest in elastic-agent-client.proto and DiagnosticParams in
// diagnostics.go
func (c *clientV2) RegisterOptionalDiagnosticHook(paramTag string, name string, description string, filename string, contentType string, hook DiagnosticHook) {
c.dmx.Lock()
defer c.dmx.Unlock()
c.diagHooks[name] = diagHook{
description: description,
filename: filename,
contentType: contentType,
hook: hook,
optionalWithParamTag: paramTag,
}
}

Expand Down Expand Up @@ -796,52 +818,85 @@ func (c *clientV2) tryPerformAction(actionResults chan *proto.ActionResponse, ac
}

func (c *clientV2) tryPerformDiagnostics(actionResults chan *proto.ActionResponse, action *proto.ActionRequest) {
// find the unit
c.unitsMu.RLock()
unit := c.findUnit(action.UnitId, UnitType(action.UnitType))
c.unitsMu.RUnlock()
if unit == nil {
actionResults <- &proto.ActionResponse{
Token: c.token,
Id: action.Id,
Status: proto.ActionResponse_FAILED,
Result: ActionErrUnitNotFound,
// see if we can unpack params
foundParams := map[string]bool{}
if len(action.GetParams()) > 0 {
params := DiagnosticParams{}
err := json.Unmarshal(action.GetParams(), &params)
if err != nil {
actionResults <- &proto.ActionResponse{
Token: c.token,
Id: action.GetId(),
Status: proto.ActionResponse_FAILED,
Result: []byte(fmt.Sprintf("error unmarshaling json in params: %s", err)),
}
}
// convert to a map for easier checking
for _, param := range params.AdditionalMetrics {
foundParams[param] = true
}
return
}

// gather diagnostics hooks for this unit
// break apart the action:
// if it's unit-level, fetch unit-level diagnostics.
// if component-level, just run diagnostics that are registered at the client level.
diagHooks := make(map[string]diagHook)
c.dmx.RLock()
for n, d := range c.diagHooks {
diagHooks[n] = d
if action.GetLevel() == proto.ActionRequest_COMPONENT || action.GetLevel() == proto.ActionRequest_ALL {
c.dmx.RLock()
for n, d := range c.diagHooks {
diagHooks[n] = d
}
c.dmx.RUnlock()

}
c.dmx.RUnlock()

// unit hooks after client hooks; allows unit specific to override client hooks
unit.dmx.RLock()
for n, d := range unit.diagHooks {
diagHooks[n] = d
if action.GetLevel() == proto.ActionRequest_UNIT || action.GetLevel() == proto.ActionRequest_ALL {
// find the unit
c.unitsMu.RLock()
unit := c.findUnit(action.GetUnitId(), UnitType(action.GetUnitType()))
c.unitsMu.RUnlock()
if unit == nil {
actionResults <- &proto.ActionResponse{
Token: c.token,
Id: action.GetId(),
Status: proto.ActionResponse_FAILED,
Result: ActionErrUnitNotFound,
}
return
}

// gather diagnostics hooks for this unit
unit.dmx.RLock()
for n, d := range unit.diagHooks {
diagHooks[n] = d
}
unit.dmx.RUnlock()
}
unit.dmx.RUnlock()

// perform diagnostics in goroutine, so we don't block other work
go func() {
res := make([]*proto.ActionDiagnosticUnitResult, 0, len(diagHooks))
for n, d := range diagHooks {
content := d.hook()
for diagName, diagHook := range diagHooks {
// if the hook came with a tag, check it.
// if the callback was registered with a tag but none was found in the request, skip it
if diagHook.optionalWithParamTag != "" {
if _, ok := foundParams[diagHook.optionalWithParamTag]; !ok {
continue
}
}
content := diagHook.hook()
res = append(res, &proto.ActionDiagnosticUnitResult{
Name: n,
Filename: d.filename,
Description: d.description,
ContentType: d.contentType,
Name: diagName,
Filename: diagHook.filename,
Description: diagHook.description,
ContentType: diagHook.contentType,
Content: content,
Generated: timestamppb.New(time.Now().UTC()),
})
}
actionResults <- &proto.ActionResponse{
Token: c.token,
Id: action.Id,
Id: action.GetId(),
Status: proto.ActionResponse_SUCCESS,
Diagnostic: res,
}
Expand All @@ -855,6 +910,19 @@ func (c *clientV2) registerDefaultDiagnostics() {
c.registerPprofDiagnostics("threadcreate", "stack traces that led to the creation of new OS threads", 0)
c.registerPprofDiagnostics("block", "stack traces that led to blocking on synchronization primitives", 0)
c.registerPprofDiagnostics("mutex", "stack traces of holders of contended mutexes", 0)

c.RegisterOptionalDiagnosticHook("CPU", "cpu-pprof", "CPU profile", "cpu.pprof", "application/octet-stream", createCPUProfile)
}

func createCPUProfile() []byte {
var writeBuf bytes.Buffer
err := pprof.StartCPUProfile(&writeBuf)
if err != nil {
return []byte(fmt.Sprintf("error starting CPU profile: %s", err))
}
time.Sleep(time.Second * 30)
pprof.StopCPUProfile()
return writeBuf.Bytes()
}

func (c *clientV2) registerPprofDiagnostics(name string, description string, debug int) {
Expand Down
Loading

0 comments on commit 10662ba

Please sign in to comment.