Skip to content

Commit

Permalink
Add profile model and endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Nov 20, 2019
1 parent 8d88990 commit 630c4a3
Show file tree
Hide file tree
Showing 13 changed files with 1,212 additions and 6 deletions.
12 changes: 12 additions & 0 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ apm-server:
# secret_token for the remote apm-servers.
#secret_token:

# Enable profiling of the server, recording profile samples as events.
#profiling:
#cpu:
# Set to true to enable CPU profiling.
#enabled: false
#interval: 60s
#duration: 10s
#heap:
# Set to true to enable heap profiling.
#enabled: false
#interval: 60s

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# Using pipelines involves two steps:
# (1) registering a pipeline
Expand Down
12 changes: 12 additions & 0 deletions apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ apm-server:
# secret_token for the remote apm-servers.
#secret_token:

# Enable profiling of the server, recording profile samples as events.
#profiling:
#cpu:
# Set to true to enable CPU profiling.
#enabled: false
#interval: 60s
#duration: 10s
#heap:
# Set to true to enable heap profiling.
#enabled: false
#interval: 60s

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# Using pipelines involves two steps:
# (1) registering a pipeline
Expand Down
12 changes: 12 additions & 0 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ apm-server:
# secret_token for the remote apm-servers.
#secret_token:

# Enable profiling of the server, recording profile samples as events.
#profiling:
#cpu:
# Set to true to enable CPU profiling.
#enabled: false
#interval: 60s
#duration: 10s
#heap:
# Set to true to enable heap profiling.
#enabled: false
#interval: 60s

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# Using pipelines involves two steps:
# (1) registering a pipeline
Expand Down
18 changes: 18 additions & 0 deletions beater/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/apm-server/beater/api/asset/sourcemap"
"github.com/elastic/apm-server/beater/api/config/agent"
"github.com/elastic/apm-server/beater/api/intake"
"github.com/elastic/apm-server/beater/api/profile"
"github.com/elastic/apm-server/beater/api/root"
"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/beater/middleware"
Expand Down Expand Up @@ -57,6 +58,9 @@ const (
// IntakeRUMPath defines the path to ingest monitored RUM events
IntakeRUMPath = "/intake/v2/rum/events"

// ProfilePath defines the path to ingest profiles
ProfilePath = "/intake/v2/profile"

// AssetSourcemapPath defines the path to upload sourcemaps
AssetSourcemapPath = "/assets/v1/sourcemaps"
)
Expand Down Expand Up @@ -85,6 +89,15 @@ func NewMux(beaterConfig *config.Config, report publish.Reporter) (*http.ServeMu
{IntakePath, backendHandler},
}

// Profiling is currently experimental, and intended for profiling the
// server itself, so we only add the route if self-profiling is enabled.
if beaterConfig.SelfInstrumentation.IsEnabled() {
if beaterConfig.SelfInstrumentation.Profiling.CPU.IsEnabled() ||
beaterConfig.SelfInstrumentation.Profiling.Heap.IsEnabled() {
routeMap = append(routeMap, route{ProfilePath, profileHandler})
}
}

for _, route := range routeMap {
h, err := route.handlerFn(beaterConfig, report)
if err != nil {
Expand All @@ -102,6 +115,11 @@ func NewMux(beaterConfig *config.Config, report publish.Reporter) (*http.ServeMu
return mux, nil
}

func profileHandler(cfg *config.Config, reporter publish.Reporter) (request.Handler, error) {
h := profile.Handler(systemMetadataDecoder(cfg, emptyDecoder), transform.Config{}, reporter)
return middleware.Wrap(h, backendMiddleware(cfg, profile.MonitoringMap)...)
}

func backendHandler(cfg *config.Config, reporter publish.Reporter) (request.Handler, error) {
h := intake.Handler(systemMetadataDecoder(cfg, emptyDecoder),
&stream.Processor{
Expand Down
265 changes: 265 additions & 0 deletions beater/api/profile/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package profile

import (
"fmt"
"io"
"net/http"
"strings"

pprof_profile "github.com/google/pprof/profile"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/monitoring"

"github.com/elastic/apm-server/beater/headers"
"github.com/elastic/apm-server/beater/request"
"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/model/metadata"
"github.com/elastic/apm-server/model/profile"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/transform"
"github.com/elastic/apm-server/utility"
"github.com/elastic/apm-server/validation"
)

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.MonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.profile", monitoring.PublishExpvar)
)

const (
// TODO(axw) include messageType in pprofContentType; needs fix in agent
pprofContentType = "application/x-protobuf"
metadataContentType = "application/json"
requestContentType = "multipart/form-data"
)

// Handler returns a request.Handler for managing profile requests.
func Handler(
dec decoder.ReqDecoder,
transformConfig transform.Config,
report publish.Reporter,
) request.Handler {
handle := func(c *request.Context) (*result, error) {
if c.Request.Method != http.MethodPost {
return nil, requestError{
id: request.IDResponseErrorsMethodNotAllowed,
err: errors.New("only POST requests are supported"),
}
}
if err := validateContentType(c.Request.Header, requestContentType); err != nil {
return nil, requestError{
id: request.IDResponseErrorsValidate,
err: err,
}
}

ok := c.RateLimiter == nil || c.RateLimiter.Allow()
if !ok {
return nil, requestError{
id: request.IDResponseErrorsRateLimit,
err: errors.New("rate limit exceeded"),
}
}

// Extract metadata from the request, like user-agent and remote address.
reqMeta, err := dec(c.Request)
if err != nil {
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode request metadata"),
}
}

tctx := &transform.Context{
RequestTime: utility.RequestTime(c.Request.Context()),
Config: transformConfig,
}

var totalLimitRemaining int64 = 10 * 1024 * 1024 // 10 MiB ought to be enough for anybody
var profiles []*pprof_profile.Profile
mr, err := c.Request.MultipartReader()
if err != nil {
return nil, err
}
for {
part, err := mr.NextPart()
if err == io.EOF {
break
} else if err != nil {
return nil, err
}

switch part.FormName() {
case "metadata":
if err := validateContentType(http.Header(part.Header), metadataContentType); err != nil {
return nil, requestError{
id: request.IDResponseErrorsValidate,
err: errors.Wrap(err, "invalid metadata"),
}
}
r := &limitedReader{r: part, n: 10 * 1024 /* 10 KiB ought to be enough for anybody */}
raw, err := decoder.DecodeJSONData(r)
if err != nil {
if err, ok := r.err.(requestError); ok {
return nil, err
}
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode metadata JSON"),
}
}
for k, v := range reqMeta {
utility.InsertInMap(raw, k, v.(map[string]interface{}))
}
if err := validation.Validate(raw, metadata.ModelSchema()); err != nil {
return nil, requestError{
id: request.IDResponseErrorsValidate,
err: errors.Wrap(err, "invalid metadata"),
}
}
metadata, err := metadata.DecodeMetadata(raw)
if err != nil {
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode metadata"),
}
}
tctx.Metadata = *metadata

case "profile":
if err := validateContentType(http.Header(part.Header), pprofContentType); err != nil {
return nil, requestError{
id: request.IDResponseErrorsValidate,
err: errors.Wrap(err, "invalid profile"),
}
}
r := &limitedReader{r: part, n: totalLimitRemaining}
profile, err := pprof_profile.Parse(r)
if err != nil {
if err, ok := r.err.(requestError); ok {
return nil, err
}
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode profile"),
}
}
profiles = append(profiles, profile)
totalLimitRemaining = r.n
}
}

transformables := make([]transform.Transformable, len(profiles))
for i, p := range profiles {
transformables[i] = profile.PprofProfile{Profile: p}
}

if err := report(c.Request.Context(), publish.PendingReq{
Transformables: transformables,
Tcontext: tctx,
}); err != nil {
switch err {
case publish.ErrChannelClosed:
return nil, requestError{
id: request.IDResponseErrorsShuttingDown,
err: errors.New("server is shutting down"),
}
case publish.ErrFull:
return nil, requestError{
id: request.IDResponseErrorsFullQueue,
err: err,
}
}
return nil, err
}
return &result{Accepted: len(transformables)}, nil
}
return func(c *request.Context) {
result, err := handle(c)
if err != nil {
switch err := err.(type) {
case requestError:
c.Result.SetWithError(err.id, err)
default:
c.Result.SetWithError(request.IDResponseErrorsInternal, err)
}
} else {
c.Result.SetWithBody(request.IDResponseValidAccepted, result)
}
c.Write()
}
}

func validateContentType(header http.Header, contentType string) error {
got := header.Get(headers.ContentType)
if !strings.Contains(got, contentType) {
return fmt.Errorf("invalid content type %q, expected %q", got, contentType)
}
return nil
}

// limitedReader is like io.LimitedReader, but returns a
// requestError upon detecting a request that is too large.
//
// Based on net/http.maxBytesReader.
type limitedReader struct {
r io.Reader
n int64
err error
}

func (l *limitedReader) Read(p []byte) (n int, err error) {
if l.err != nil || len(p) == 0 {
return 0, l.err
}
if int64(len(p)) > l.n+1 {
p = p[:l.n+1]
}
n, err = l.r.Read(p)

if int64(n) <= l.n {
l.n -= int64(n)
l.err = err
return n, err
}

n = int(l.n)
l.n = 0
l.err = requestError{
id: request.IDResponseErrorsRequestTooLarge,
err: errors.New("too large"),
}
return n, l.err
}

type result struct {
Accepted int `json:"accepted"`
}

type requestError struct {
id request.ResultID
err error
}

func (e requestError) Error() string {
return e.err.Error()
}
Loading

0 comments on commit 630c4a3

Please sign in to comment.