Skip to content

Commit

Permalink
chore: support multiple profiling types (pingcap#1095)
Browse files Browse the repository at this point in the history
  • Loading branch information
YiniXu9506 authored and baurine committed Dec 30, 2021
1 parent e9caf73 commit 6bdcdf2
Show file tree
Hide file tree
Showing 11 changed files with 414 additions and 171 deletions.
102 changes: 73 additions & 29 deletions pkg/apiserver/profiling/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ package profiling

import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"time"

"github.com/joomcode/errorx"

"github.com/pingcap/tidb-dashboard/pkg/apiserver/model"
"github.com/pingcap/tidb-dashboard/pkg/dbstore"
)
Expand All @@ -19,34 +23,68 @@ const (
TaskStateError TaskState = iota
TaskStateRunning
TaskStateFinish
TaskPartialFinish
TaskStatePartialFinish // Only valid for task group
TaskStateSkipped
)

type TaskRawDataType string

const RawDataTypeProtobuf TaskRawDataType = "protobuf"
const (
RawDataTypeProtobuf TaskRawDataType = "protobuf"
RawDataTypeText TaskRawDataType = "text"
)

type (
TaskProfilingType string
TaskProfilingTypeList []TaskProfilingType
)

func (r *TaskProfilingTypeList) Scan(src interface{}) error {
return json.Unmarshal([]byte(src.(string)), r)
}

func (r TaskProfilingTypeList) Value() (driver.Value, error) {
val, err := json.Marshal(r)
return string(val), err
}

const (
ProfilingTypeCPU TaskProfilingType = "cpu"
ProfilingTypeHeap TaskProfilingType = "heap"
ProfilingTypeGoroutine TaskProfilingType = "goroutine"
ProfilingTypeMutex TaskProfilingType = "mutex"
)

var profilingTypeMap = map[TaskProfilingType]struct{}{
ProfilingTypeCPU: {},
ProfilingTypeHeap: {},
ProfilingTypeGoroutine: {},
ProfilingTypeMutex: {},
}

type TaskModel struct {
ID uint `json:"id" gorm:"primary_key"`
TaskGroupID uint `json:"task_group_id" gorm:"index"`
State TaskState `json:"state" gorm:"index"`
Target model.RequestTargetNode `json:"target" gorm:"embedded;embedded_prefix:target_"`
FilePath string `json:"-" gorm:"type:text"`
Error string `json:"error" gorm:"type:text"`
StartedAt int64 `json:"started_at"` // The start running time, reset when retry. Used to estimate approximate profiling progress.
RawDataType TaskRawDataType `json:"raw_data_type"`
ID uint `json:"id" gorm:"primary_key"`
TaskGroupID uint `json:"task_group_id" gorm:"index"`
State TaskState `json:"state" gorm:"index"`
Target model.RequestTargetNode `json:"target" gorm:"embedded;embedded_prefix:target_"`
FilePath string `json:"-" gorm:"type:text"`
Error string `json:"error" gorm:"type:text"`
StartedAt int64 `json:"started_at"` // The start running time, reset when retry. Used to estimate approximate profiling progress.
RawDataType TaskRawDataType `json:"raw_data_type" gorm:"raw_data_type"`
ProfilingType TaskProfilingType `json:"profiling_type"`
}

func (TaskModel) TableName() string {
return "profiling_tasks"
}

type TaskGroupModel struct {
ID uint `json:"id" gorm:"primary_key"`
State TaskState `json:"state" gorm:"index"`
ProfileDurationSecs uint `json:"profile_duration_secs"`
TargetStats model.RequestTargetStatistics `json:"target_stats" gorm:"embedded;embedded_prefix:target_stats_"`
StartedAt int64 `json:"started_at"`
ID uint `json:"id" gorm:"primary_key"`
State TaskState `json:"state" gorm:"index"`
ProfileDurationSecs uint `json:"profile_duration_secs"`
TargetStats model.RequestTargetStatistics `json:"target_stats" gorm:"embedded;embedded_prefix:target_stats_"`
StartedAt int64 `json:"started_at"`
RequstedProfilingTypes TaskProfilingTypeList `json:"requsted_profiling_types"`
}

func (TaskGroupModel) TableName() string {
Expand All @@ -67,14 +105,15 @@ type Task struct {
}

// NewTask creates a new profiling task.
func NewTask(ctx context.Context, taskGroup *TaskGroup, target model.RequestTargetNode, fts *fetchers) *Task {
func NewTask(ctx context.Context, taskGroup *TaskGroup, target model.RequestTargetNode, fts *fetchers, profilingType TaskProfilingType) *Task {
ctx, cancel := context.WithCancel(ctx)
return &Task{
TaskModel: &TaskModel{
TaskGroupID: taskGroup.ID,
State: TaskStateRunning,
Target: target,
StartedAt: time.Now().Unix(),
TaskGroupID: taskGroup.ID,
State: TaskStateRunning,
Target: target,
StartedAt: time.Now().Unix(),
ProfilingType: profilingType,
},
ctx: ctx,
cancel: cancel,
Expand All @@ -84,11 +123,15 @@ func NewTask(ctx context.Context, taskGroup *TaskGroup, target model.RequestTarg
}

func (t *Task) run() {
fileNameWithoutExt := fmt.Sprintf("profiling_%d_%d_%s", t.TaskGroupID, t.ID, t.Target.FileName())
protoFilePath, rawDataType, err := profileAndWritePprof(t.ctx, t.fetchers, &t.Target, fileNameWithoutExt, t.taskGroup.ProfileDurationSecs)
fileNameWithoutExt := fmt.Sprintf("profiling_%d_%d_%s_%s", t.TaskGroupID, t.ID, t.ProfilingType, t.Target.FileName())
protoFilePath, rawDataType, err := profileAndWritePprof(t.ctx, t.fetchers, &t.Target, fileNameWithoutExt, t.taskGroup.ProfileDurationSecs, t.ProfilingType)
if err != nil {
t.Error = err.Error()
t.State = TaskStateError
if errorx.IsOfType(err, ErrUnsupportedProfilingType) {
t.State = TaskStateSkipped
} else {
t.Error = err.Error()
t.State = TaskStateError
}
t.taskGroup.db.Save(t.TaskModel)
return
}
Expand All @@ -109,13 +152,14 @@ type TaskGroup struct {
}

// NewTaskGroup create a new profiling task group.
func NewTaskGroup(db *dbstore.DB, profileDurationSecs uint, stats model.RequestTargetStatistics) *TaskGroup {
func NewTaskGroup(db *dbstore.DB, profileDurationSecs uint, stats model.RequestTargetStatistics, requestedProfilingTypes TaskProfilingTypeList) *TaskGroup {
return &TaskGroup{
TaskGroupModel: &TaskGroupModel{
State: TaskStateRunning,
ProfileDurationSecs: profileDurationSecs,
TargetStats: stats,
StartedAt: time.Now().Unix(),
State: TaskStateRunning,
ProfileDurationSecs: profileDurationSecs,
TargetStats: stats,
StartedAt: time.Now().Unix(),
RequstedProfilingTypes: requestedProfilingTypes,
},
db: db,
}
Expand Down
43 changes: 32 additions & 11 deletions pkg/apiserver/profiling/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ type pprofOptions struct {
duration uint
fileNameWithoutExt string

target *model.RequestTargetNode
fetcher *profileFetcher
target *model.RequestTargetNode
fetcher *profileFetcher
profilingType TaskProfilingType
}

func fetchPprof(op *pprofOptions) (string, TaskRawDataType, error) {
fetcher := &fetcher{profileFetcher: op.fetcher, target: op.target}
tmpPath, rawDataType, err := fetcher.FetchAndWriteToFile(op.duration, op.fileNameWithoutExt)
tmpPath, rawDataType, err := fetcher.FetchAndWriteToFile(op.duration, op.fileNameWithoutExt, op.profilingType)
if err != nil {
return "", "", fmt.Errorf("failed to fetch annd write to temp file: %v", err)
return "", "", fmt.Errorf("failed to fetch and write to temp file: %v", err)
}

return tmpPath, rawDataType, nil
Expand All @@ -33,8 +34,31 @@ type fetcher struct {
profileFetcher *profileFetcher
}

func (f *fetcher) FetchAndWriteToFile(duration uint, fileNameWithoutExt string) (string, TaskRawDataType, error) {
tmpfile, err := ioutil.TempFile("", fileNameWithoutExt+"*.proto")
func (f *fetcher) FetchAndWriteToFile(duration uint, fileNameWithoutExt string, profilingType TaskProfilingType) (string, TaskRawDataType, error) {
var profilingRawDataType TaskRawDataType
var fileExtenstion string
secs := strconv.Itoa(int(duration))
var url string
switch profilingType {
case ProfilingTypeCPU:
url = "/debug/pprof/profile?seconds=" + secs
profilingRawDataType = RawDataTypeProtobuf
fileExtenstion = "*.proto"
case ProfilingTypeHeap:
url = "/debug/pprof/heap"
profilingRawDataType = RawDataTypeProtobuf
fileExtenstion = "*.proto"
case ProfilingTypeGoroutine:
url = "/debug/pprof/goroutine?debug=1"
profilingRawDataType = RawDataTypeText
fileExtenstion = "*.txt"
case ProfilingTypeMutex:
url = "/debug/pprof/mutex?debug=1"
profilingRawDataType = RawDataTypeText
fileExtenstion = "*.txt"
}

tmpfile, err := ioutil.TempFile("", fileNameWithoutExt+fileExtenstion)
if err != nil {
return "", "", fmt.Errorf("failed to create tmpfile to write profile: %v", err)
}
Expand All @@ -43,18 +67,15 @@ func (f *fetcher) FetchAndWriteToFile(duration uint, fileNameWithoutExt string)
_ = tmpfile.Close()
}()

secs := strconv.Itoa(int(duration))
url := "/debug/pprof/profile?seconds=" + secs

resp, err := (*f.profileFetcher).fetch(&fetchOptions{ip: f.target.IP, port: f.target.Port, path: url})
if err != nil {
return "", "", fmt.Errorf("failed to fetch profile with proto format: %v", err)
return "", "", fmt.Errorf("failed to fetch profile with %v format: %v", fileExtenstion, err)
}

_, err = tmpfile.Write(resp)
if err != nil {
return "", "", fmt.Errorf("failed to write profile: %v", err)
}

return tmpfile.Name(), RawDataTypeProtobuf, nil
return tmpfile.Name(), profilingRawDataType, nil
}
21 changes: 14 additions & 7 deletions pkg/apiserver/profiling/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,29 @@ package profiling

import (
"context"
"fmt"

"github.com/pingcap/tidb-dashboard/pkg/apiserver/model"
)

func profileAndWritePprof(ctx context.Context, fts *fetchers, target *model.RequestTargetNode, fileNameWithoutExt string, profileDurationSecs uint) (string, TaskRawDataType, error) {
func profileAndWritePprof(ctx context.Context, fts *fetchers, target *model.RequestTargetNode, fileNameWithoutExt string, profileDurationSecs uint, profilingType TaskProfilingType) (string, TaskRawDataType, error) {
switch target.Kind {
case model.NodeKindTiKV:
return fetchPprof(&pprofOptions{duration: profileDurationSecs, fileNameWithoutExt: fileNameWithoutExt, target: target, fetcher: &fts.tikv})
// TiKV only supports CPU Profiling
if profilingType != ProfilingTypeCPU {
return "", "", ErrUnsupportedProfilingType.NewWithNoMessage()
}
return fetchPprof(&pprofOptions{duration: profileDurationSecs, fileNameWithoutExt: fileNameWithoutExt, target: target, fetcher: &fts.tikv, profilingType: profilingType})
case model.NodeKindTiFlash:
return fetchPprof(&pprofOptions{duration: profileDurationSecs, fileNameWithoutExt: fileNameWithoutExt, target: target, fetcher: &fts.tiflash})
// TiFlash only supports CPU Profiling
if profilingType != ProfilingTypeCPU {
return "", "", ErrUnsupportedProfilingType.NewWithNoMessage()
}
return fetchPprof(&pprofOptions{duration: profileDurationSecs, fileNameWithoutExt: fileNameWithoutExt, target: target, fetcher: &fts.tiflash, profilingType: profilingType})
case model.NodeKindTiDB:
return fetchPprof(&pprofOptions{duration: profileDurationSecs, fileNameWithoutExt: fileNameWithoutExt, target: target, fetcher: &fts.tidb})
return fetchPprof(&pprofOptions{duration: profileDurationSecs, fileNameWithoutExt: fileNameWithoutExt, target: target, fetcher: &fts.tidb, profilingType: profilingType})
case model.NodeKindPD:
return fetchPprof(&pprofOptions{duration: profileDurationSecs, fileNameWithoutExt: fileNameWithoutExt, target: target, fetcher: &fts.pd})
return fetchPprof(&pprofOptions{duration: profileDurationSecs, fileNameWithoutExt: fileNameWithoutExt, target: target, fetcher: &fts.pd, profilingType: profilingType})
default:
return "", "", fmt.Errorf("unsupported target %s", target)
return "", "", ErrUnsupportedProfilingTarget.New(target.String())
}
}
10 changes: 10 additions & 0 deletions pkg/apiserver/profiling/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ type ViewOutputType string
const (
ViewOutputTypeProtobuf ViewOutputType = "protobuf"
ViewOutputTypeGraph ViewOutputType = "graph"
ViewOutputTypeText ViewOutputType = "text"
)

// @ID viewProfilingSingle
Expand Down Expand Up @@ -332,6 +333,15 @@ func (s *Service) viewSingle(c *gin.Context) {
_ = c.Error(rest.ErrBadRequest.New("Cannot output protobuf as %s", outputType))
return
}
} else if task.RawDataType == RawDataTypeText {
switch outputType {
case string(ViewOutputTypeText):
contentType = "text/plain"
default:
// Will not handle converting text to other formats
_ = c.Error(rest.ErrBadRequest.New("Cannot output text as %s", outputType))
return
}
}
c.Data(http.StatusOK, contentType, content)
}
Expand Down
44 changes: 30 additions & 14 deletions pkg/apiserver/profiling/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,17 @@ const (
)

var (
ErrNS = errorx.NewNamespace("error.profiling")
ErrIgnoredRequest = ErrNS.NewType("ignored_request")
ErrTimeout = ErrNS.NewType("timeout")
ErrNS = errorx.NewNamespace("error.api.profiling")
ErrIgnoredRequest = ErrNS.NewType("ignored_request")
ErrTimeout = ErrNS.NewType("timeout")
ErrUnsupportedProfilingType = ErrNS.NewType("unsupported_profiling_type")
ErrUnsupportedProfilingTarget = ErrNS.NewType("unsupported_profiling_target")
)

type StartRequest struct {
Targets []model.RequestTargetNode `json:"targets"`
DurationSecs uint `json:"duration_secs"`
Targets []model.RequestTargetNode `json:"targets"`
DurationSecs uint `json:"duration_secs"`
RequstedProfilingTypes TaskProfilingTypeList `json:"requsted_profiling_types"`
}

type StartRequestSession struct {
Expand Down Expand Up @@ -150,18 +153,27 @@ func (s *Service) exclusiveExecute(ctx context.Context, req *StartRequest) (*Tas
}

func (s *Service) startGroup(ctx context.Context, req *StartRequest) (*TaskGroup, error) {
taskGroup := NewTaskGroup(s.params.LocalStore, req.DurationSecs, model.NewRequestTargetStatisticsFromArray(&req.Targets))
taskGroup := NewTaskGroup(s.params.LocalStore, req.DurationSecs, model.NewRequestTargetStatisticsFromArray(&req.Targets), req.RequstedProfilingTypes)
if err := s.params.LocalStore.Create(taskGroup.TaskGroupModel).Error; err != nil {
log.Warn("failed to start task group", zap.Error(err))
return nil, err
}

tasks := make([]*Task, 0, len(req.Targets))
for _, target := range req.Targets {
t := NewTask(ctx, taskGroup, target, s.fetchers)
s.params.LocalStore.Create(t.TaskModel)
s.tasks.Store(t.ID, t)
tasks = append(tasks, t)
profileTypeList := req.RequstedProfilingTypes
for _, profilingType := range profileTypeList {
// profilingTypeMap checks the validation of requestedProfilingType.
_, valid := profilingTypeMap[profilingType]
if !valid {
return nil, ErrUnsupportedProfilingType.NewWithNoMessage()
}

t := NewTask(ctx, taskGroup, target, s.fetchers, profilingType)
s.params.LocalStore.Create(t.TaskModel)
s.tasks.Store(t.ID, t)
tasks = append(tasks, t)
}
}

s.wg.Add(1)
Expand All @@ -177,16 +189,20 @@ func (s *Service) startGroup(ctx context.Context, req *StartRequest) (*TaskGroup
}(i)
}
wg.Wait()
errorTasks := 0
finishedTasks := 0
for _, task := range tasks {
if task.State == TaskStateFinish {
if task.State == TaskStateError {
errorTasks++
} else if task.State == TaskStateFinish {
finishedTasks++
}
}
if finishedTasks == 0 {
if errorTasks > 0 {
taskGroup.State = TaskStateError
} else if finishedTasks < len(tasks) {
taskGroup.State = TaskPartialFinish
if finishedTasks > 0 {
taskGroup.State = TaskStatePartialFinish
}
} else {
taskGroup.State = TaskStateFinish
}
Expand Down
Loading

0 comments on commit 6bdcdf2

Please sign in to comment.