diff --git a/beater/api/mux.go b/beater/api/mux.go index 1515b58c588..6a9ab22030e 100644 --- a/beater/api/mux.go +++ b/beater/api/mux.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/apm-server/beater/api/asset/sourcemap" "github.com/elastic/apm-server/beater/api/config/agent" "github.com/elastic/apm-server/beater/api/intake" + "github.com/elastic/apm-server/beater/api/profile" "github.com/elastic/apm-server/beater/api/root" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/beater/middleware" @@ -58,6 +59,9 @@ const ( // IntakeRUMPath defines the path to ingest monitored RUM events IntakeRUMPath = "/intake/v2/rum/events" + // ProfilePath defines the path to ingest profiles + ProfilePath = "/intake/v2/profile" + // AssetSourcemapPath defines the path to upload sourcemaps AssetSourcemapPath = "/assets/v1/sourcemaps" ) @@ -86,6 +90,15 @@ func NewMux(beaterConfig *config.Config, report publish.Reporter) (*http.ServeMu {IntakePath, backendHandler}, } + // Profiling is currently experimental, and intended for profiling the + // server itself, so we only add the route if self-profiling is enabled. + if beaterConfig.SelfInstrumentation.IsEnabled() { + if beaterConfig.SelfInstrumentation.Profiling.CPU.IsEnabled() || + beaterConfig.SelfInstrumentation.Profiling.Heap.IsEnabled() { + routeMap = append(routeMap, route{ProfilePath, profileHandler}) + } + } + for _, route := range routeMap { h, err := route.handlerFn(beaterConfig, report) if err != nil { @@ -103,6 +116,11 @@ func NewMux(beaterConfig *config.Config, report publish.Reporter) (*http.ServeMu return mux, nil } +func profileHandler(cfg *config.Config, reporter publish.Reporter) (request.Handler, error) { + h := profile.Handler(systemMetadataDecoder(cfg, emptyDecoder), transform.Config{}, reporter) + return middleware.Wrap(h, backendMiddleware(cfg, profile.MonitoringMap)...) +} + func backendHandler(cfg *config.Config, reporter publish.Reporter) (request.Handler, error) { h := intake.Handler(systemMetadataDecoder(cfg, emptyDecoder), &stream.Processor{ diff --git a/beater/api/profile/handler.go b/beater/api/profile/handler.go new file mode 100644 index 00000000000..0c1902fdfe8 --- /dev/null +++ b/beater/api/profile/handler.go @@ -0,0 +1,265 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package profile + +import ( + "fmt" + "io" + "net/http" + "strings" + + pprof_profile "github.com/google/pprof/profile" + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/monitoring" + + "github.com/elastic/apm-server/beater/headers" + "github.com/elastic/apm-server/beater/request" + "github.com/elastic/apm-server/decoder" + "github.com/elastic/apm-server/model/metadata" + "github.com/elastic/apm-server/model/profile" + "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/transform" + "github.com/elastic/apm-server/utility" + "github.com/elastic/apm-server/validation" +) + +var ( + // MonitoringMap holds a mapping for request.IDs to monitoring counters + MonitoringMap = request.MonitoringMapForRegistry(registry) + registry = monitoring.Default.NewRegistry("apm-server.profile", monitoring.PublishExpvar) +) + +const ( + // TODO(axw) include messageType in pprofContentType; needs fix in agent + pprofContentType = "application/x-protobuf" + metadataContentType = "application/json" + requestContentType = "multipart/form-data" +) + +// Handler returns a request.Handler for managing profile requests. +func Handler( + dec decoder.ReqDecoder, + transformConfig transform.Config, + report publish.Reporter, +) request.Handler { + handle := func(c *request.Context) (*result, error) { + if c.Request.Method != http.MethodPost { + return nil, requestError{ + id: request.IDResponseErrorsMethodNotAllowed, + err: errors.New("only POST requests are supported"), + } + } + if err := validateContentType(c.Request.Header, requestContentType); err != nil { + return nil, requestError{ + id: request.IDResponseErrorsValidate, + err: err, + } + } + + ok := c.RateLimiter == nil || c.RateLimiter.Allow() + if !ok { + return nil, requestError{ + id: request.IDResponseErrorsRateLimit, + err: errors.New("rate limit exceeded"), + } + } + + // Extract metadata from the request, like user-agent and remote address. + reqMeta, err := dec(c.Request) + if err != nil { + return nil, requestError{ + id: request.IDResponseErrorsDecode, + err: errors.Wrap(err, "failed to decode request metadata"), + } + } + + tctx := &transform.Context{ + RequestTime: utility.RequestTime(c.Request.Context()), + Config: transformConfig, + } + + var totalLimitRemaining int64 = 10 * 1024 * 1024 // 10 MiB ought to be enough for anybody + var profiles []*pprof_profile.Profile + mr, err := c.Request.MultipartReader() + if err != nil { + return nil, err + } + for { + part, err := mr.NextPart() + if err == io.EOF { + break + } else if err != nil { + return nil, err + } + + switch part.FormName() { + case "metadata": + if err := validateContentType(http.Header(part.Header), metadataContentType); err != nil { + return nil, requestError{ + id: request.IDResponseErrorsValidate, + err: errors.Wrap(err, "invalid metadata"), + } + } + r := &limitedReader{r: part, n: 10 * 1024 /* 10 KiB ought to be enough for anybody */} + raw, err := decoder.DecodeJSONData(r) + if err != nil { + if err, ok := r.err.(requestError); ok { + return nil, err + } + return nil, requestError{ + id: request.IDResponseErrorsDecode, + err: errors.Wrap(err, "failed to decode metadata JSON"), + } + } + for k, v := range reqMeta { + utility.InsertInMap(raw, k, v.(map[string]interface{})) + } + if err := validation.Validate(raw, metadata.ModelSchema()); err != nil { + return nil, requestError{ + id: request.IDResponseErrorsValidate, + err: errors.Wrap(err, "invalid metadata"), + } + } + metadata, err := metadata.DecodeMetadata(raw) + if err != nil { + return nil, requestError{ + id: request.IDResponseErrorsDecode, + err: errors.Wrap(err, "failed to decode metadata"), + } + } + tctx.Metadata = *metadata + + case "profile": + if err := validateContentType(http.Header(part.Header), pprofContentType); err != nil { + return nil, requestError{ + id: request.IDResponseErrorsValidate, + err: errors.Wrap(err, "invalid profile"), + } + } + r := &limitedReader{r: part, n: totalLimitRemaining} + profile, err := pprof_profile.Parse(r) + if err != nil { + if err, ok := r.err.(requestError); ok { + return nil, err + } + return nil, requestError{ + id: request.IDResponseErrorsDecode, + err: errors.Wrap(err, "failed to decode profile"), + } + } + profiles = append(profiles, profile) + totalLimitRemaining = r.n + } + } + + transformables := make([]transform.Transformable, len(profiles)) + for i, p := range profiles { + transformables[i] = profile.PprofProfile{Profile: p} + } + + if err := report(c.Request.Context(), publish.PendingReq{ + Transformables: transformables, + Tcontext: tctx, + }); err != nil { + switch err { + case publish.ErrChannelClosed: + return nil, requestError{ + id: request.IDResponseErrorsShuttingDown, + err: errors.New("server is shutting down"), + } + case publish.ErrFull: + return nil, requestError{ + id: request.IDResponseErrorsFullQueue, + err: err, + } + } + return nil, err + } + return &result{Accepted: len(transformables)}, nil + } + return func(c *request.Context) { + result, err := handle(c) + if err != nil { + switch err := err.(type) { + case requestError: + c.Result.SetWithError(err.id, err) + default: + c.Result.SetWithError(request.IDResponseErrorsInternal, err) + } + } else { + c.Result.SetWithBody(request.IDResponseValidAccepted, result) + } + c.Write() + } +} + +func validateContentType(header http.Header, contentType string) error { + got := header.Get(headers.ContentType) + if !strings.Contains(got, contentType) { + return fmt.Errorf("invalid content type %q, expected %q", got, contentType) + } + return nil +} + +// limitedReader is like io.LimitedReader, but returns a +// requestError upon detecting a request that is too large. +// +// Based on net/http.maxBytesReader. +type limitedReader struct { + r io.Reader + n int64 + err error +} + +func (l *limitedReader) Read(p []byte) (n int, err error) { + if l.err != nil || len(p) == 0 { + return 0, l.err + } + if int64(len(p)) > l.n+1 { + p = p[:l.n+1] + } + n, err = l.r.Read(p) + + if int64(n) <= l.n { + l.n -= int64(n) + l.err = err + return n, err + } + + n = int(l.n) + l.n = 0 + l.err = requestError{ + id: request.IDResponseErrorsRequestTooLarge, + err: errors.New("too large"), + } + return n, l.err +} + +type result struct { + Accepted int `json:"accepted"` +} + +type requestError struct { + id request.ResultID + err error +} + +func (e requestError) Error() string { + return e.err.Error() +} diff --git a/beater/api/profile/handler_test.go b/beater/api/profile/handler_test.go new file mode 100644 index 00000000000..ea5bb0620f9 --- /dev/null +++ b/beater/api/profile/handler_test.go @@ -0,0 +1,287 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package profile + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "net/http/httptest" + "net/textproto" + "runtime/pprof" + "strings" + "testing" + + "github.com/elastic/apm-server/beater/api/ratelimit" + "github.com/elastic/apm-server/transform" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/apm-server/beater/beatertest" + "github.com/elastic/apm-server/beater/headers" + "github.com/elastic/apm-server/beater/request" + "github.com/elastic/apm-server/decoder" + "github.com/elastic/apm-server/publish" +) + +func TestHandler(t *testing.T) { + var rateLimit, err = ratelimit.NewStore(1, 0, 0) + require.NoError(t, err) + for name, tc := range map[string]testcaseIntakeHandler{ + "MethodNotAllowed": { + r: httptest.NewRequest(http.MethodGet, "/", nil), + id: request.IDResponseErrorsMethodNotAllowed, + }, + "RequestInvalidContentType": { + r: func() *http.Request { + req := httptest.NewRequest(http.MethodPost, "/", nil) + req.Header.Set(headers.ContentType, "text/plain") + return req + }(), + id: request.IDResponseErrorsValidate, + }, + "RateLimitExceeded": { + rateLimit: rateLimit, + id: request.IDResponseErrorsRateLimit, + }, + "Closing": { + reporter: func(t *testing.T) publish.Reporter { + return beatertest.ErrorReporterFn(publish.ErrChannelClosed) + }, + id: request.IDResponseErrorsShuttingDown, + }, + "FullQueue": { + reporter: func(t *testing.T) publish.Reporter { + return beatertest.ErrorReporterFn(publish.ErrFull) + }, + id: request.IDResponseErrorsFullQueue, + }, + "Empty": { + id: request.IDResponseValidAccepted, + body: prettyJSON(map[string]interface{}{"accepted": 0}), + }, + "UnknownPartIgnored": { + id: request.IDResponseValidAccepted, + body: prettyJSON(map[string]interface{}{"accepted": 0}), + parts: []part{{ + name: "foo", + contentType: "text/plain", + body: strings.NewReader(""), + }}, + }, + + "MetadataTooLarge": { + id: request.IDResponseErrorsRequestTooLarge, + parts: []part{{ + name: "metadata", + contentType: "application/json", + body: strings.NewReader("{" + strings.Repeat(" ", 10*1024) + "}"), + }}, + }, + "MetadataInvalidContentType": { + id: request.IDResponseErrorsValidate, + parts: []part{{ + name: "metadata", + contentType: "text/plain", + body: strings.NewReader(`{"service":{"name":"foo","agent":{}}}`), + }}, + }, + "MetadataInvalidJSON": { + id: request.IDResponseErrorsDecode, + parts: []part{{ + name: "metadata", + contentType: "application/json", + body: strings.NewReader("{..."), + }}, + }, + "MetadataInvalid": { + id: request.IDResponseErrorsValidate, + parts: []part{{ + name: "metadata", + contentType: "application/json", + body: strings.NewReader("{}"), // does not validate + }}, + }, + + "Profile": { + id: request.IDResponseValidAccepted, + parts: []part{ + heapProfilePart(), + heapProfilePart(), + part{ + name: "metadata", + contentType: "application/json", + body: strings.NewReader(`{"service":{"name":"foo","agent":{}}}`), + }, + }, + body: prettyJSON(map[string]interface{}{"accepted": 2}), + reports: 1, + reporter: func(t *testing.T) publish.Reporter { + return func(ctx context.Context, req publish.PendingReq) error { + require.Len(t, req.Transformables, 2) + assert.Equal(t, "foo", *req.Tcontext.Metadata.Service.Name) + return nil + } + }, + }, + "ProfileInvalidContentType": { + id: request.IDResponseErrorsValidate, + parts: []part{{ + name: "metadata", + contentType: "text/plain", + body: strings.NewReader(""), + }}, + body: prettyJSON(map[string]interface{}{"accepted": 0}), + }, + "ProfileInvalid": { + id: request.IDResponseErrorsDecode, + parts: []part{{ + name: "profile", + contentType: "application/x-protobuf", + body: strings.NewReader("foo"), + }}, + body: prettyJSON(map[string]interface{}{"accepted": 0}), + }, + "ProfileTooLarge": { + id: request.IDResponseErrorsRequestTooLarge, + parts: []part{ + heapProfilePart(), + part{ + name: "profile", + contentType: "application/x-protobuf", + body: strings.NewReader(strings.Repeat("*", 10*1024*1024)), + }, + }, + body: prettyJSON(map[string]interface{}{"accepted": 0}), + }, + } { + t.Run(name, func(t *testing.T) { + tc.setup(t) + if tc.rateLimit != nil { + tc.c.RateLimiter = tc.rateLimit.ForIP(&http.Request{}) + } + Handler(tc.dec, transform.Config{}, tc.reporter(t))(tc.c) + + assert.Equal(t, string(tc.id), string(tc.c.Result.ID)) + resultStatus := request.MapResultIDToStatus[tc.id] + assert.Equal(t, resultStatus.Code, tc.w.Code) + assert.Equal(t, "application/json", tc.w.Header().Get(headers.ContentType)) + + assert.Zero(t, tc.reports) + if tc.id == request.IDResponseValidAccepted { + assert.Equal(t, tc.body, tc.w.Body.String()) + assert.Nil(t, tc.c.Result.Err) + } else { + assert.NotNil(t, tc.c.Result.Err) + assert.NotZero(t, tc.w.Body.Len()) + } + }) + } +} + +type testcaseIntakeHandler struct { + c *request.Context + w *httptest.ResponseRecorder + r *http.Request + dec decoder.ReqDecoder + rateLimit *ratelimit.Store + reporter func(t *testing.T) publish.Reporter + reports int + parts []part + + id request.ResultID + body string +} + +func (tc *testcaseIntakeHandler) setup(t *testing.T) { + if tc.dec == nil { + tc.dec = emptyDec + } + if tc.reporter == nil { + tc.reporter = func(t *testing.T) publish.Reporter { + return beatertest.NilReporter + } + } + if tc.reports > 0 { + orig := tc.reporter + tc.reporter = func(t *testing.T) publish.Reporter { + orig := orig(t) + return func(ctx context.Context, req publish.PendingReq) error { + tc.reports-- + return orig(ctx, req) + } + } + } + if tc.r == nil { + var buf bytes.Buffer + mpw := multipart.NewWriter(&buf) + for _, part := range tc.parts { + h := make(textproto.MIMEHeader) + h.Set("Content-Disposition", fmt.Sprintf(`form-data; name=%q`, part.name)) + h.Set("Content-Type", part.contentType) + + p, err := mpw.CreatePart(h) + require.NoError(t, err) + _, err = io.Copy(p, part.body) + require.NoError(t, err) + } + mpw.Close() + + tc.r = httptest.NewRequest(http.MethodPost, "/", &buf) + tc.r.Header.Set("Content-Type", mpw.FormDataContentType()) + } + tc.r.Header.Add("Accept", "application/json") + tc.w = httptest.NewRecorder() + tc.c = &request.Context{} + tc.c.Reset(tc.w, tc.r) +} + +func emptyDec(_ *http.Request) (map[string]interface{}, error) { + return map[string]interface{}{}, nil +} + +func heapProfilePart() part { + var buf bytes.Buffer + if err := pprof.WriteHeapProfile(&buf); err != nil { + panic(err) + } + return part{ + name: "profile", + contentType: "application/x-protobuf", + body: &buf, + } +} + +type part struct { + name string + contentType string + body io.Reader +} + +func prettyJSON(v interface{}) string { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + enc.SetIndent("", " ") + enc.Encode(v) + return buf.String() +} diff --git a/beater/beater.go b/beater/beater.go index be24dfde05f..d6cba45ee10 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -255,6 +255,19 @@ func initTracer(info beat.Info, cfg *config.Config, logger *logp.Logger) (*apm.T return tracer, nil, err } + if cfg.SelfInstrumentation.Profiling.CPU.IsEnabled() { + interval := cfg.SelfInstrumentation.Profiling.CPU.Interval + duration := cfg.SelfInstrumentation.Profiling.CPU.Duration + logger.Infof("CPU profiling: every %s for %s", interval, duration) + os.Setenv("ELASTIC_APM_CPU_PROFILE_INTERVAL", fmt.Sprintf("%dms", int(interval.Seconds()*1000))) + os.Setenv("ELASTIC_APM_CPU_PROFILE_DURATION", fmt.Sprintf("%dms", int(duration.Seconds()*1000))) + } + if cfg.SelfInstrumentation.Profiling.Heap.IsEnabled() { + interval := cfg.SelfInstrumentation.Profiling.Heap.Interval + logger.Infof("Heap profiling: every %s", interval) + os.Setenv("ELASTIC_APM_HEAP_PROFILE_INTERVAL", fmt.Sprintf("%dms", int(interval.Seconds()*1000))) + } + var tracerTransport transport.Transport var lis net.Listener if cfg.SelfInstrumentation.Hosts != nil { @@ -304,6 +317,7 @@ func initTracer(info beat.Info, cfg *config.Config, logger *logp.Logger) (*apm.T return nil, nil, err } tracer.SetLogger(logp.NewLogger(logs.Tracing)) + return tracer, lis, nil } diff --git a/changelogs/head.asciidoc b/changelogs/head.asciidoc index 7c9a5f8bd87..6e59d4dcc81 100644 --- a/changelogs/head.asciidoc +++ b/changelogs/head.asciidoc @@ -7,4 +7,5 @@ https://github.com/elastic/apm-server/compare/7.5\...master[View commits] ==== Added - Adds support for global labels in spans {pull}2806[2806]. - Updated `go.elastic.co/apm` to v1.6.0, enabling central config {pull}2913[2913] +- Added experimental support for continuous profiling of the server {pull}2839[2839] diff --git a/model/profile/_meta/fields.yml b/model/profile/_meta/fields.yml index 962c15f9fa4..e25f969edd5 100644 --- a/model/profile/_meta/fields.yml +++ b/model/profile/_meta/fields.yml @@ -7,6 +7,11 @@ dynamic: false fields: + - name: id + type: keyword + description: > + The unique ID of the profile. + - name: cpu type: group fields: diff --git a/model/profile/profile.go b/model/profile/profile.go index 81d9cea5fa7..46dfe5c42df 100644 --- a/model/profile/profile.go +++ b/model/profile/profile.go @@ -56,9 +56,18 @@ func (pp PprofProfile) Transform(tctx *transform.Context) []beat.Event { valueFieldNames[i] = sampleType.Type + "." + sampleUnit } + // Generate a unique ID for all samples in the profile. + var profileID string + if idBytes, err := common.RandomBytes(8); err == nil { + profileID = fmt.Sprintf("%x", idBytes) + } + samples := make([]beat.Event, len(pp.Profile.Sample)) for i, sample := range pp.Profile.Sample { profileFields := common.MapStr{} + if profileID != "" { + profileFields["id"] = profileID + } if pp.Profile.DurationNanos > 0 { profileFields["duration"] = pp.Profile.DurationNanos } diff --git a/model/profile/profile_test.go b/model/profile/profile_test.go index 19d9f022edc..696c136bdc2 100644 --- a/model/profile/profile_test.go +++ b/model/profile/profile_test.go @@ -94,6 +94,15 @@ func TestPprofProfileTransform(t *testing.T) { output := pp.Transform(tctx) require.Len(t, output, 2) assert.Equal(t, output[0], output[1]) + + // Each profile is assigned a random ID; all samples within a + // document share the same random ID, which is checked by the + // equality assertion above. + profileField, ok := output[0].Fields["profile"].(common.MapStr) + require.True(t, ok) + assert.NotEmpty(t, profileField["id"]) + delete(profileField, "id") + assert.Equal(t, beat.Event{ Timestamp: timestamp, Fields: common.MapStr{