Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental self-profiling #2839

Merged
merged 14 commits into from
Nov 29, 2019
Merged
18 changes: 18 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,15 @@ License type (autodetected): Apache-2.0
Apache License 2.0


--------------------------------------------------------------------
Dependency: github.com/google/pprof
Revision: 27840fff0d09770c422884093a210ac5ce453ea6
License type (autodetected): Apache-2.0
./vendor/github.com/google/pprof/LICENSE:
--------------------------------------------------------------------
Apache License 2.0


--------------------------------------------------------------------
Dependency: github.com/google/shlex
Revision: c34317bd91bf98fab745d77b03933cf8769299fe
Expand Down Expand Up @@ -2456,6 +2465,15 @@ License type (autodetected): Apache-2.0
Apache License 2.0


--------------------------------------------------------------------
Dependency: github.com/OneOfOne/xxhash
Revision: 8f0be54a8d5ccf68817143d8c996147ec5cbd795
License type (autodetected): Apache-2.0
./vendor/github.com/OneOfOne/xxhash/LICENSE:
--------------------------------------------------------------------
Apache License 2.0


--------------------------------------------------------------------
Dependency: github.com/opencontainers/go-digest
Revision: eaa60544f31ccf3b0653b1a118b76d33418ff41b
Expand Down
14 changes: 14 additions & 0 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ apm-server:
# secret_token for the remote apm-servers.
#secret_token:

# Enable profiling of the server, recording profile samples as events.
#
# This feature is experimental.
#profiling:
#cpu:
# Set to true to enable CPU profiling.
#enabled: false
#interval: 60s
#duration: 10s
#heap:
# Set to true to enable heap profiling.
#enabled: false
#interval: 60s

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# Using pipelines involves two steps:
# (1) registering a pipeline
Expand Down
14 changes: 14 additions & 0 deletions apm-server.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ apm-server:
# secret_token for the remote apm-servers.
#secret_token:

# Enable profiling of the server, recording profile samples as events.
#
# This feature is experimental.
#profiling:
#cpu:
# Set to true to enable CPU profiling.
#enabled: false
#interval: 60s
#duration: 10s
#heap:
# Set to true to enable heap profiling.
#enabled: false
#interval: 60s

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# Using pipelines involves two steps:
# (1) registering a pipeline
Expand Down
14 changes: 14 additions & 0 deletions apm-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ apm-server:
# secret_token for the remote apm-servers.
#secret_token:

# Enable profiling of the server, recording profile samples as events.
#
# This feature is experimental.
#profiling:
#cpu:
# Set to true to enable CPU profiling.
#enabled: false
#interval: 60s
#duration: 10s
#heap:
# Set to true to enable heap profiling.
#enabled: false
#interval: 60s

# A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch.
# Using pipelines involves two steps:
# (1) registering a pipeline
Expand Down
18 changes: 18 additions & 0 deletions beater/api/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/apm-server/beater/api/asset/sourcemap"
"github.com/elastic/apm-server/beater/api/config/agent"
"github.com/elastic/apm-server/beater/api/intake"
"github.com/elastic/apm-server/beater/api/profile"
"github.com/elastic/apm-server/beater/api/root"
"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/beater/middleware"
Expand Down Expand Up @@ -58,6 +59,9 @@ const (
// IntakeRUMPath defines the path to ingest monitored RUM events
IntakeRUMPath = "/intake/v2/rum/events"

// ProfilePath defines the path to ingest profiles
ProfilePath = "/intake/v2/profile"

// AssetSourcemapPath defines the path to upload sourcemaps
AssetSourcemapPath = "/assets/v1/sourcemaps"
)
Expand Down Expand Up @@ -86,6 +90,15 @@ func NewMux(beaterConfig *config.Config, report publish.Reporter) (*http.ServeMu
{IntakePath, backendHandler},
}

// Profiling is currently experimental, and intended for profiling the
// server itself, so we only add the route if self-profiling is enabled.
if beaterConfig.SelfInstrumentation.IsEnabled() {
if beaterConfig.SelfInstrumentation.Profiling.CPU.IsEnabled() ||
beaterConfig.SelfInstrumentation.Profiling.Heap.IsEnabled() {
routeMap = append(routeMap, route{ProfilePath, profileHandler})
}
}

for _, route := range routeMap {
h, err := route.handlerFn(beaterConfig, report)
if err != nil {
Expand All @@ -103,6 +116,11 @@ func NewMux(beaterConfig *config.Config, report publish.Reporter) (*http.ServeMu
return mux, nil
}

func profileHandler(cfg *config.Config, reporter publish.Reporter) (request.Handler, error) {
h := profile.Handler(systemMetadataDecoder(cfg, emptyDecoder), transform.Config{}, reporter)
return middleware.Wrap(h, backendMiddleware(cfg, profile.MonitoringMap)...)
}

func backendHandler(cfg *config.Config, reporter publish.Reporter) (request.Handler, error) {
h := intake.Handler(systemMetadataDecoder(cfg, emptyDecoder),
&stream.Processor{
Expand Down
240 changes: 240 additions & 0 deletions beater/api/profile/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package profile

import (
"fmt"
"io"
"net/http"
"strings"

pprof_profile "github.com/google/pprof/profile"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/monitoring"

"github.com/elastic/apm-server/beater/headers"
"github.com/elastic/apm-server/beater/request"
"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/model/metadata"
"github.com/elastic/apm-server/model/profile"
"github.com/elastic/apm-server/publish"
"github.com/elastic/apm-server/transform"
"github.com/elastic/apm-server/utility"
"github.com/elastic/apm-server/validation"
)

var (
// MonitoringMap holds a mapping for request.IDs to monitoring counters
MonitoringMap = request.MonitoringMapForRegistry(registry)
registry = monitoring.Default.NewRegistry("apm-server.profile", monitoring.PublishExpvar)
)

const (
// TODO(axw) include messageType in pprofContentType; needs fix in agent
pprofContentType = "application/x-protobuf"
metadataContentType = "application/json"
requestContentType = "multipart/form-data"

metadataContentLengthLimit = 10 * 1024
profileContentLengthLimit = 10 * 1024 * 1024
)

// Handler returns a request.Handler for managing profile requests.
func Handler(
dec decoder.ReqDecoder,
transformConfig transform.Config,
report publish.Reporter,
) request.Handler {
handle := func(c *request.Context) (*result, error) {
if c.Request.Method != http.MethodPost {
return nil, requestError{
id: request.IDResponseErrorsMethodNotAllowed,
err: errors.New("only POST requests are supported"),
}
}
if err := validateContentType(c.Request.Header, requestContentType); err != nil {
return nil, requestError{
id: request.IDResponseErrorsValidate,
err: err,
}
}

ok := c.RateLimiter == nil || c.RateLimiter.Allow()
if !ok {
return nil, requestError{
id: request.IDResponseErrorsRateLimit,
err: errors.New("rate limit exceeded"),
}
}

// Extract metadata from the request, such as the remote address.
reqMeta, err := dec(c.Request)
if err != nil {
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode request metadata"),
}
}

tctx := &transform.Context{
RequestTime: utility.RequestTime(c.Request.Context()),
Config: transformConfig,
}

var totalLimitRemaining int64 = profileContentLengthLimit
var profiles []*pprof_profile.Profile
mr, err := c.Request.MultipartReader()
if err != nil {
return nil, err
}
for {
part, err := mr.NextPart()
if err == io.EOF {
break
} else if err != nil {
return nil, err
}

switch part.FormName() {
case "metadata":
if err := validateContentType(http.Header(part.Header), metadataContentType); err != nil {
return nil, requestError{
id: request.IDResponseErrorsValidate,
err: errors.Wrap(err, "invalid metadata"),
}
}
r := &decoder.LimitedReader{R: part, N: metadataContentLengthLimit}
raw, err := decoder.DecodeJSONData(r)
if err != nil {
if r.N < 0 {
return nil, requestError{
id: request.IDResponseErrorsRequestTooLarge,
err: err,
}
}
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode metadata JSON"),
}
}
for k, v := range reqMeta {
utility.InsertInMap(raw, k, v.(map[string]interface{}))
}
if err := validation.Validate(raw, metadata.ModelSchema()); err != nil {
return nil, requestError{
id: request.IDResponseErrorsValidate,
err: errors.Wrap(err, "invalid metadata"),
}
}
metadata, err := metadata.DecodeMetadata(raw)
if err != nil {
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode metadata"),
}
}
tctx.Metadata = *metadata

case "profile":
if err := validateContentType(http.Header(part.Header), pprofContentType); err != nil {
return nil, requestError{
id: request.IDResponseErrorsValidate,
err: errors.Wrap(err, "invalid profile"),
}
}
r := &decoder.LimitedReader{R: part, N: totalLimitRemaining}
profile, err := pprof_profile.Parse(r)
if err != nil {
if r.N < 0 {
return nil, requestError{
id: request.IDResponseErrorsRequestTooLarge,
err: err,
}
}
return nil, requestError{
id: request.IDResponseErrorsDecode,
err: errors.Wrap(err, "failed to decode profile"),
}
}
profiles = append(profiles, profile)
totalLimitRemaining = r.N
}
}

transformables := make([]transform.Transformable, len(profiles))
for i, p := range profiles {
transformables[i] = profile.PprofProfile{Profile: p}
}

if err := report(c.Request.Context(), publish.PendingReq{
Transformables: transformables,
Tcontext: tctx,
}); err != nil {
switch err {
case publish.ErrChannelClosed:
return nil, requestError{
id: request.IDResponseErrorsShuttingDown,
err: errors.New("server is shutting down"),
}
case publish.ErrFull:
return nil, requestError{
id: request.IDResponseErrorsFullQueue,
err: err,
}
}
return nil, err
}
return &result{Accepted: len(transformables)}, nil
}
return func(c *request.Context) {
result, err := handle(c)
if err != nil {
switch err := err.(type) {
case requestError:
c.Result.SetWithError(err.id, err)
default:
c.Result.SetWithError(request.IDResponseErrorsInternal, err)
}
} else {
c.Result.SetWithBody(request.IDResponseValidAccepted, result)
}
c.Write()
}
}

func validateContentType(header http.Header, contentType string) error {
got := header.Get(headers.ContentType)
if !strings.Contains(got, contentType) {
return fmt.Errorf("invalid content type %q, expected %q", got, contentType)
}
return nil
}

type result struct {
Accepted int `json:"accepted"`
}

type requestError struct {
id request.ResultID
err error
}

func (e requestError) Error() string {
return e.err.Error()
}
Loading