diff --git a/_meta/beat.yml b/_meta/beat.yml index 9ef9edb1469..b183ca671d8 100644 --- a/_meta/beat.yml +++ b/_meta/beat.yml @@ -53,6 +53,18 @@ apm-server: # secret_token for the remote apm-servers. #secret_token: + # Enable profiling of the server, recording profile samples as events. + #profiling: + #cpu: + # Set to true to enable CPU profiling. + #enabled: false + #interval: 60s + #duration: 10s + #heap: + # Set to true to enable heap profiling. + #enabled: false + #interval: 60s + # A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch. # Using pipelines involves two steps: # (1) registering a pipeline diff --git a/apm-server.docker.yml b/apm-server.docker.yml index 875078b85aa..5acb7f44e97 100644 --- a/apm-server.docker.yml +++ b/apm-server.docker.yml @@ -53,6 +53,18 @@ apm-server: # secret_token for the remote apm-servers. #secret_token: + # Enable profiling of the server, recording profile samples as events. + #profiling: + #cpu: + # Set to true to enable CPU profiling. + #enabled: false + #interval: 60s + #duration: 10s + #heap: + # Set to true to enable heap profiling. + #enabled: false + #interval: 60s + # A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch. # Using pipelines involves two steps: # (1) registering a pipeline diff --git a/apm-server.yml b/apm-server.yml index 815c2a009ca..c8b88256e49 100644 --- a/apm-server.yml +++ b/apm-server.yml @@ -53,6 +53,18 @@ apm-server: # secret_token for the remote apm-servers. #secret_token: + # Enable profiling of the server, recording profile samples as events. + #profiling: + #cpu: + # Set to true to enable CPU profiling. + #enabled: false + #interval: 60s + #duration: 10s + #heap: + # Set to true to enable heap profiling. + #enabled: false + #interval: 60s + # A pipeline is a definition of processors applied to documents when ingesting them to Elasticsearch. # Using pipelines involves two steps: # (1) registering a pipeline diff --git a/beater/api/mux.go b/beater/api/mux.go index e10cf41de10..f75aab8eafc 100644 --- a/beater/api/mux.go +++ b/beater/api/mux.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/apm-server/beater/api/asset/sourcemap" "github.com/elastic/apm-server/beater/api/config/agent" "github.com/elastic/apm-server/beater/api/intake" + "github.com/elastic/apm-server/beater/api/profile" "github.com/elastic/apm-server/beater/api/root" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/beater/middleware" @@ -57,6 +58,9 @@ const ( // IntakeRUMPath defines the path to ingest monitored RUM events IntakeRUMPath = "/intake/v2/rum/events" + // ProfilePath defines the path to ingest profiles + ProfilePath = "/intake/v2/profile" + // AssetSourcemapPath defines the path to upload sourcemaps AssetSourcemapPath = "/assets/v1/sourcemaps" ) @@ -85,6 +89,15 @@ func NewMux(beaterConfig *config.Config, report publish.Reporter) (*http.ServeMu {IntakePath, backendHandler}, } + // Profiling is currently experimental, and intended for profiling the + // server itself, so we only add the route if self-profiling is enabled. + if beaterConfig.SelfInstrumentation.IsEnabled() { + if beaterConfig.SelfInstrumentation.Profiling.CPU.IsEnabled() || + beaterConfig.SelfInstrumentation.Profiling.Heap.IsEnabled() { + routeMap = append(routeMap, route{ProfilePath, profileHandler}) + } + } + for _, route := range routeMap { h, err := route.handlerFn(beaterConfig, report) if err != nil { @@ -102,6 +115,11 @@ func NewMux(beaterConfig *config.Config, report publish.Reporter) (*http.ServeMu return mux, nil } +func profileHandler(cfg *config.Config, reporter publish.Reporter) (request.Handler, error) { + h := profile.Handler(systemMetadataDecoder(cfg, emptyDecoder), transform.Config{}, reporter) + return middleware.Wrap(h, backendMiddleware(cfg, profile.MonitoringMap)...) +} + func backendHandler(cfg *config.Config, reporter publish.Reporter) (request.Handler, error) { h := intake.Handler(systemMetadataDecoder(cfg, emptyDecoder), &stream.Processor{ diff --git a/beater/api/profile/handler.go b/beater/api/profile/handler.go new file mode 100644 index 00000000000..0c1902fdfe8 --- /dev/null +++ b/beater/api/profile/handler.go @@ -0,0 +1,265 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package profile + +import ( + "fmt" + "io" + "net/http" + "strings" + + pprof_profile "github.com/google/pprof/profile" + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/monitoring" + + "github.com/elastic/apm-server/beater/headers" + "github.com/elastic/apm-server/beater/request" + "github.com/elastic/apm-server/decoder" + "github.com/elastic/apm-server/model/metadata" + "github.com/elastic/apm-server/model/profile" + "github.com/elastic/apm-server/publish" + "github.com/elastic/apm-server/transform" + "github.com/elastic/apm-server/utility" + "github.com/elastic/apm-server/validation" +) + +var ( + // MonitoringMap holds a mapping for request.IDs to monitoring counters + MonitoringMap = request.MonitoringMapForRegistry(registry) + registry = monitoring.Default.NewRegistry("apm-server.profile", monitoring.PublishExpvar) +) + +const ( + // TODO(axw) include messageType in pprofContentType; needs fix in agent + pprofContentType = "application/x-protobuf" + metadataContentType = "application/json" + requestContentType = "multipart/form-data" +) + +// Handler returns a request.Handler for managing profile requests. +func Handler( + dec decoder.ReqDecoder, + transformConfig transform.Config, + report publish.Reporter, +) request.Handler { + handle := func(c *request.Context) (*result, error) { + if c.Request.Method != http.MethodPost { + return nil, requestError{ + id: request.IDResponseErrorsMethodNotAllowed, + err: errors.New("only POST requests are supported"), + } + } + if err := validateContentType(c.Request.Header, requestContentType); err != nil { + return nil, requestError{ + id: request.IDResponseErrorsValidate, + err: err, + } + } + + ok := c.RateLimiter == nil || c.RateLimiter.Allow() + if !ok { + return nil, requestError{ + id: request.IDResponseErrorsRateLimit, + err: errors.New("rate limit exceeded"), + } + } + + // Extract metadata from the request, like user-agent and remote address. + reqMeta, err := dec(c.Request) + if err != nil { + return nil, requestError{ + id: request.IDResponseErrorsDecode, + err: errors.Wrap(err, "failed to decode request metadata"), + } + } + + tctx := &transform.Context{ + RequestTime: utility.RequestTime(c.Request.Context()), + Config: transformConfig, + } + + var totalLimitRemaining int64 = 10 * 1024 * 1024 // 10 MiB ought to be enough for anybody + var profiles []*pprof_profile.Profile + mr, err := c.Request.MultipartReader() + if err != nil { + return nil, err + } + for { + part, err := mr.NextPart() + if err == io.EOF { + break + } else if err != nil { + return nil, err + } + + switch part.FormName() { + case "metadata": + if err := validateContentType(http.Header(part.Header), metadataContentType); err != nil { + return nil, requestError{ + id: request.IDResponseErrorsValidate, + err: errors.Wrap(err, "invalid metadata"), + } + } + r := &limitedReader{r: part, n: 10 * 1024 /* 10 KiB ought to be enough for anybody */} + raw, err := decoder.DecodeJSONData(r) + if err != nil { + if err, ok := r.err.(requestError); ok { + return nil, err + } + return nil, requestError{ + id: request.IDResponseErrorsDecode, + err: errors.Wrap(err, "failed to decode metadata JSON"), + } + } + for k, v := range reqMeta { + utility.InsertInMap(raw, k, v.(map[string]interface{})) + } + if err := validation.Validate(raw, metadata.ModelSchema()); err != nil { + return nil, requestError{ + id: request.IDResponseErrorsValidate, + err: errors.Wrap(err, "invalid metadata"), + } + } + metadata, err := metadata.DecodeMetadata(raw) + if err != nil { + return nil, requestError{ + id: request.IDResponseErrorsDecode, + err: errors.Wrap(err, "failed to decode metadata"), + } + } + tctx.Metadata = *metadata + + case "profile": + if err := validateContentType(http.Header(part.Header), pprofContentType); err != nil { + return nil, requestError{ + id: request.IDResponseErrorsValidate, + err: errors.Wrap(err, "invalid profile"), + } + } + r := &limitedReader{r: part, n: totalLimitRemaining} + profile, err := pprof_profile.Parse(r) + if err != nil { + if err, ok := r.err.(requestError); ok { + return nil, err + } + return nil, requestError{ + id: request.IDResponseErrorsDecode, + err: errors.Wrap(err, "failed to decode profile"), + } + } + profiles = append(profiles, profile) + totalLimitRemaining = r.n + } + } + + transformables := make([]transform.Transformable, len(profiles)) + for i, p := range profiles { + transformables[i] = profile.PprofProfile{Profile: p} + } + + if err := report(c.Request.Context(), publish.PendingReq{ + Transformables: transformables, + Tcontext: tctx, + }); err != nil { + switch err { + case publish.ErrChannelClosed: + return nil, requestError{ + id: request.IDResponseErrorsShuttingDown, + err: errors.New("server is shutting down"), + } + case publish.ErrFull: + return nil, requestError{ + id: request.IDResponseErrorsFullQueue, + err: err, + } + } + return nil, err + } + return &result{Accepted: len(transformables)}, nil + } + return func(c *request.Context) { + result, err := handle(c) + if err != nil { + switch err := err.(type) { + case requestError: + c.Result.SetWithError(err.id, err) + default: + c.Result.SetWithError(request.IDResponseErrorsInternal, err) + } + } else { + c.Result.SetWithBody(request.IDResponseValidAccepted, result) + } + c.Write() + } +} + +func validateContentType(header http.Header, contentType string) error { + got := header.Get(headers.ContentType) + if !strings.Contains(got, contentType) { + return fmt.Errorf("invalid content type %q, expected %q", got, contentType) + } + return nil +} + +// limitedReader is like io.LimitedReader, but returns a +// requestError upon detecting a request that is too large. +// +// Based on net/http.maxBytesReader. +type limitedReader struct { + r io.Reader + n int64 + err error +} + +func (l *limitedReader) Read(p []byte) (n int, err error) { + if l.err != nil || len(p) == 0 { + return 0, l.err + } + if int64(len(p)) > l.n+1 { + p = p[:l.n+1] + } + n, err = l.r.Read(p) + + if int64(n) <= l.n { + l.n -= int64(n) + l.err = err + return n, err + } + + n = int(l.n) + l.n = 0 + l.err = requestError{ + id: request.IDResponseErrorsRequestTooLarge, + err: errors.New("too large"), + } + return n, l.err +} + +type result struct { + Accepted int `json:"accepted"` +} + +type requestError struct { + id request.ResultID + err error +} + +func (e requestError) Error() string { + return e.err.Error() +} diff --git a/beater/api/profile/handler_test.go b/beater/api/profile/handler_test.go new file mode 100644 index 00000000000..ea5bb0620f9 --- /dev/null +++ b/beater/api/profile/handler_test.go @@ -0,0 +1,287 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package profile + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "net/http/httptest" + "net/textproto" + "runtime/pprof" + "strings" + "testing" + + "github.com/elastic/apm-server/beater/api/ratelimit" + "github.com/elastic/apm-server/transform" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/apm-server/beater/beatertest" + "github.com/elastic/apm-server/beater/headers" + "github.com/elastic/apm-server/beater/request" + "github.com/elastic/apm-server/decoder" + "github.com/elastic/apm-server/publish" +) + +func TestHandler(t *testing.T) { + var rateLimit, err = ratelimit.NewStore(1, 0, 0) + require.NoError(t, err) + for name, tc := range map[string]testcaseIntakeHandler{ + "MethodNotAllowed": { + r: httptest.NewRequest(http.MethodGet, "/", nil), + id: request.IDResponseErrorsMethodNotAllowed, + }, + "RequestInvalidContentType": { + r: func() *http.Request { + req := httptest.NewRequest(http.MethodPost, "/", nil) + req.Header.Set(headers.ContentType, "text/plain") + return req + }(), + id: request.IDResponseErrorsValidate, + }, + "RateLimitExceeded": { + rateLimit: rateLimit, + id: request.IDResponseErrorsRateLimit, + }, + "Closing": { + reporter: func(t *testing.T) publish.Reporter { + return beatertest.ErrorReporterFn(publish.ErrChannelClosed) + }, + id: request.IDResponseErrorsShuttingDown, + }, + "FullQueue": { + reporter: func(t *testing.T) publish.Reporter { + return beatertest.ErrorReporterFn(publish.ErrFull) + }, + id: request.IDResponseErrorsFullQueue, + }, + "Empty": { + id: request.IDResponseValidAccepted, + body: prettyJSON(map[string]interface{}{"accepted": 0}), + }, + "UnknownPartIgnored": { + id: request.IDResponseValidAccepted, + body: prettyJSON(map[string]interface{}{"accepted": 0}), + parts: []part{{ + name: "foo", + contentType: "text/plain", + body: strings.NewReader(""), + }}, + }, + + "MetadataTooLarge": { + id: request.IDResponseErrorsRequestTooLarge, + parts: []part{{ + name: "metadata", + contentType: "application/json", + body: strings.NewReader("{" + strings.Repeat(" ", 10*1024) + "}"), + }}, + }, + "MetadataInvalidContentType": { + id: request.IDResponseErrorsValidate, + parts: []part{{ + name: "metadata", + contentType: "text/plain", + body: strings.NewReader(`{"service":{"name":"foo","agent":{}}}`), + }}, + }, + "MetadataInvalidJSON": { + id: request.IDResponseErrorsDecode, + parts: []part{{ + name: "metadata", + contentType: "application/json", + body: strings.NewReader("{..."), + }}, + }, + "MetadataInvalid": { + id: request.IDResponseErrorsValidate, + parts: []part{{ + name: "metadata", + contentType: "application/json", + body: strings.NewReader("{}"), // does not validate + }}, + }, + + "Profile": { + id: request.IDResponseValidAccepted, + parts: []part{ + heapProfilePart(), + heapProfilePart(), + part{ + name: "metadata", + contentType: "application/json", + body: strings.NewReader(`{"service":{"name":"foo","agent":{}}}`), + }, + }, + body: prettyJSON(map[string]interface{}{"accepted": 2}), + reports: 1, + reporter: func(t *testing.T) publish.Reporter { + return func(ctx context.Context, req publish.PendingReq) error { + require.Len(t, req.Transformables, 2) + assert.Equal(t, "foo", *req.Tcontext.Metadata.Service.Name) + return nil + } + }, + }, + "ProfileInvalidContentType": { + id: request.IDResponseErrorsValidate, + parts: []part{{ + name: "metadata", + contentType: "text/plain", + body: strings.NewReader(""), + }}, + body: prettyJSON(map[string]interface{}{"accepted": 0}), + }, + "ProfileInvalid": { + id: request.IDResponseErrorsDecode, + parts: []part{{ + name: "profile", + contentType: "application/x-protobuf", + body: strings.NewReader("foo"), + }}, + body: prettyJSON(map[string]interface{}{"accepted": 0}), + }, + "ProfileTooLarge": { + id: request.IDResponseErrorsRequestTooLarge, + parts: []part{ + heapProfilePart(), + part{ + name: "profile", + contentType: "application/x-protobuf", + body: strings.NewReader(strings.Repeat("*", 10*1024*1024)), + }, + }, + body: prettyJSON(map[string]interface{}{"accepted": 0}), + }, + } { + t.Run(name, func(t *testing.T) { + tc.setup(t) + if tc.rateLimit != nil { + tc.c.RateLimiter = tc.rateLimit.ForIP(&http.Request{}) + } + Handler(tc.dec, transform.Config{}, tc.reporter(t))(tc.c) + + assert.Equal(t, string(tc.id), string(tc.c.Result.ID)) + resultStatus := request.MapResultIDToStatus[tc.id] + assert.Equal(t, resultStatus.Code, tc.w.Code) + assert.Equal(t, "application/json", tc.w.Header().Get(headers.ContentType)) + + assert.Zero(t, tc.reports) + if tc.id == request.IDResponseValidAccepted { + assert.Equal(t, tc.body, tc.w.Body.String()) + assert.Nil(t, tc.c.Result.Err) + } else { + assert.NotNil(t, tc.c.Result.Err) + assert.NotZero(t, tc.w.Body.Len()) + } + }) + } +} + +type testcaseIntakeHandler struct { + c *request.Context + w *httptest.ResponseRecorder + r *http.Request + dec decoder.ReqDecoder + rateLimit *ratelimit.Store + reporter func(t *testing.T) publish.Reporter + reports int + parts []part + + id request.ResultID + body string +} + +func (tc *testcaseIntakeHandler) setup(t *testing.T) { + if tc.dec == nil { + tc.dec = emptyDec + } + if tc.reporter == nil { + tc.reporter = func(t *testing.T) publish.Reporter { + return beatertest.NilReporter + } + } + if tc.reports > 0 { + orig := tc.reporter + tc.reporter = func(t *testing.T) publish.Reporter { + orig := orig(t) + return func(ctx context.Context, req publish.PendingReq) error { + tc.reports-- + return orig(ctx, req) + } + } + } + if tc.r == nil { + var buf bytes.Buffer + mpw := multipart.NewWriter(&buf) + for _, part := range tc.parts { + h := make(textproto.MIMEHeader) + h.Set("Content-Disposition", fmt.Sprintf(`form-data; name=%q`, part.name)) + h.Set("Content-Type", part.contentType) + + p, err := mpw.CreatePart(h) + require.NoError(t, err) + _, err = io.Copy(p, part.body) + require.NoError(t, err) + } + mpw.Close() + + tc.r = httptest.NewRequest(http.MethodPost, "/", &buf) + tc.r.Header.Set("Content-Type", mpw.FormDataContentType()) + } + tc.r.Header.Add("Accept", "application/json") + tc.w = httptest.NewRecorder() + tc.c = &request.Context{} + tc.c.Reset(tc.w, tc.r) +} + +func emptyDec(_ *http.Request) (map[string]interface{}, error) { + return map[string]interface{}{}, nil +} + +func heapProfilePart() part { + var buf bytes.Buffer + if err := pprof.WriteHeapProfile(&buf); err != nil { + panic(err) + } + return part{ + name: "profile", + contentType: "application/x-protobuf", + body: &buf, + } +} + +type part struct { + name string + contentType string + body io.Reader +} + +func prettyJSON(v interface{}) string { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + enc.SetIndent("", " ") + enc.Encode(v) + return buf.String() +} diff --git a/beater/beater.go b/beater/beater.go index 60c856ba35d..929f655f0eb 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -266,6 +266,19 @@ func initTracer(info beat.Info, cfg *config.Config, logger *logp.Logger) (*apm.T return tracer, nil, err } + if cfg.SelfInstrumentation.Profiling.CPU.IsEnabled() { + interval := cfg.SelfInstrumentation.Profiling.CPU.Interval + duration := cfg.SelfInstrumentation.Profiling.CPU.Duration + logger.Infof("CPU profiling: every %s for %s", interval, duration) + os.Setenv("ELASTIC_APM_CPU_PROFILE_INTERVAL", interval.String()) + os.Setenv("ELASTIC_APM_CPU_PROFILE_DURATION", duration.String()) + } + if cfg.SelfInstrumentation.Profiling.Heap.IsEnabled() { + interval := cfg.SelfInstrumentation.Profiling.Heap.Interval + logger.Infof("Heap profiling: every %s", interval) + os.Setenv("ELASTIC_APM_HEAP_PROFILE_INTERVAL", interval.String()) + } + var tracerTransport transport.Transport var lis net.Listener if cfg.SelfInstrumentation.Hosts != nil { @@ -315,6 +328,7 @@ func initTracer(info beat.Info, cfg *config.Config, logger *logp.Logger) (*apm.T return nil, nil, err } tracer.SetLogger(logp.NewLogger(logs.Tracing)) + return tracer, lis, nil } diff --git a/beater/config/config.go b/beater/config/config.go index ce45edfe3ad..386ae914c5f 100644 --- a/beater/config/config.go +++ b/beater/config/config.go @@ -41,6 +41,10 @@ const ( DefaultAPMPipeline = "apm" msgInvalidConfigAgentCfg = "invalid value for `apm-server.agent.config.cache.expiration`, only accepting full seconds" + + defaultCPUProfilingInterval = 1 * time.Minute + defaultCPUProfilingDuration = 10 * time.Second + defaultHeapProfilingInterval = 1 * time.Minute ) // Config holds configuration information nested under the key `apm-server` @@ -130,10 +134,36 @@ type Cache struct { // InstrumentationConfig holds config information about self instrumenting the APM Server type InstrumentationConfig struct { - Enabled *bool `config:"enabled"` - Environment *string `config:"environment"` - Hosts urls `config:"hosts" validate:"nonzero"` - SecretToken string `config:"secret_token"` + Enabled *bool `config:"enabled"` + Environment *string `config:"environment"` + Hosts urls `config:"hosts" validate:"nonzero"` + Profiling ProfilingConfig `config:"profiling"` + SecretToken string `config:"secret_token"` +} + +// ProfilingConfig holds config information about self profiling the APM Server +type ProfilingConfig struct { + CPU *CPUProfiling `config:"cpu"` + Heap *HeapProfiling `config:"heap"` +} + +type CPUProfiling struct { + Enabled bool `config:"enabled"` + Interval time.Duration `config:"interval" validate:"positive"` + Duration time.Duration `config:"duration" validate:"positive"` +} + +func (p *CPUProfiling) IsEnabled() bool { + return p != nil && p.Enabled +} + +type HeapProfiling struct { + Enabled bool `config:"enabled"` + Interval time.Duration `config:"interval" validate:"positive"` +} + +func (p *HeapProfiling) IsEnabled() bool { + return p != nil && p.Enabled } //Mode enumerates the APM Server env @@ -189,7 +219,21 @@ func NewConfig(version string, ucfg *common.Config) (*Config, error) { return nil, errors.New(fmt.Sprintf("Invalid regex for `exclude_from_grouping`: %v", err.Error())) } } - + if c.SelfInstrumentation.IsEnabled() { + if c.SelfInstrumentation.Profiling.CPU.IsEnabled() { + if c.SelfInstrumentation.Profiling.CPU.Interval <= 0 { + c.SelfInstrumentation.Profiling.CPU.Interval = defaultCPUProfilingInterval + } + if c.SelfInstrumentation.Profiling.CPU.Duration <= 0 { + c.SelfInstrumentation.Profiling.CPU.Duration = defaultCPUProfilingDuration + } + } + if c.SelfInstrumentation.Profiling.Heap.IsEnabled() { + if c.SelfInstrumentation.Profiling.Heap.Interval <= 0 { + c.SelfInstrumentation.Profiling.Heap.Interval = defaultHeapProfilingInterval + } + } + } return c, nil } diff --git a/docs/fields.asciidoc b/docs/fields.asciidoc index ab305bf495d..83c6da2fef9 100644 --- a/docs/fields.asciidoc +++ b/docs/fields.asciidoc @@ -14,6 +14,7 @@ grouped in the following categories: * <> * <> +* <> * <> * <> * <> @@ -1107,6 +1108,171 @@ type: keyword -- +[[exported-fields-apm-profile]] +== APM Profile fields + +Profiling-specific data for APM. + + + + +*`profile.cpu.ns`*:: ++ +-- +Amount of CPU time profiled, in nanoseconds. + + +type: long + +-- + + +*`profile.samples.count`*:: ++ +-- +Number of profile samples for the profiling period. + + +type: long + +-- + + +*`profile.alloc_objects.count`*:: ++ +-- +Number of objects allocated since the process started. + + +type: long + +-- + + +*`profile.alloc_space.bytes`*:: ++ +-- +Amount of memory allocated, in bytes, since the process started. + + +type: long + +-- + + +*`profile.inuse_objects.count`*:: ++ +-- +Number of objects allocated and currently in use. + + +type: long + +-- + + +*`profile.inuse_space.bytes`*:: ++ +-- +Amount of memory allocated, in bytes, and currently in use. + + +type: long + +-- + +*`profile.duration`*:: ++ +-- +Duration of the span, in microseconds. + + +type: long + +-- + + +*`profile.top.id`*:: ++ +-- +Unique ID for the top stack frame in the context of its callers. + + +type: keyword + +-- + +*`profile.top.function`*:: ++ +-- +Function name for the top stack frame. + + +type: keyword + +-- + +*`profile.top.filename`*:: ++ +-- +Source code filename for the top stack frame. + + +type: keyword + +-- + +*`profile.top.line`*:: ++ +-- +Source code line number for the top stack frame. + + +type: long + +-- + + +*`profile.stack.id`*:: ++ +-- +Unique ID for a stack frame in the context of its callers. + + +type: keyword + +-- + +*`profile.stack.function`*:: ++ +-- +Function name for a stack frame. + + +type: keyword + +-- + +*`profile.stack.filename`*:: ++ +-- +Source code filename for a stack frame. + + +type: keyword + +-- + +*`profile.stack.line`*:: ++ +-- +Source code line number for a stack frame. + + +type: long + +-- + [[exported-fields-apm-sourcemap]] == APM Sourcemap fields diff --git a/include/fields.go b/include/fields.go index 30b605e5958..e8e1203a2a2 100644 --- a/include/fields.go +++ b/include/fields.go @@ -32,5 +32,5 @@ func init() { // AssetFieldsYml returns asset data. // This is the base64 encoded gzipped contents of fields.yml. func AssetFieldsYml() string { - return "" + return "" } diff --git a/model/profile/_meta/fields.yml b/model/profile/_meta/fields.yml new file mode 100644 index 00000000000..3f017a3a610 --- /dev/null +++ b/model/profile/_meta/fields.yml @@ -0,0 +1,115 @@ +- key: apm-profile + title: APM Profile + description: Profiling-specific data for APM. + fields: + - name: profile + type: group + dynamic: false + fields: + + - name: cpu + type: group + fields: + - name: ns + type: long + count: 1 + description: > + Amount of CPU time profiled, in nanoseconds. + + - name: samples + type: group + fields: + - name: count + type: long + count: 1 + description: > + Number of profile samples for the profiling period. + + - name: alloc_objects + type: group + fields: + - name: count + type: long + count: 1 + description: > + Number of objects allocated since the process started. + + - name: alloc_space + type: group + fields: + - name: bytes + type: long + count: 1 + description: > + Amount of memory allocated, in bytes, since the process started. + + - name: inuse_objects + type: group + fields: + - name: count + type: long + count: 1 + description: > + Number of objects allocated and currently in use. + + - name: inuse_space + type: group + fields: + - name: bytes + type: long + count: 1 + description: > + Amount of memory allocated, in bytes, and currently in use. + + # TODO(axw) cpu + + - name: duration + type: long + count: 1 + description: > + Duration of the span, in microseconds. + + - name: top + type: group + dynamic: false + fields: + - name: id + type: keyword + description: > + Unique ID for the top stack frame in the context of its callers. + - name: function + type: keyword + count: 1 + description: > + Function name for the top stack frame. + - name: filename + type: keyword + count: 1 + description: > + Source code filename for the top stack frame. + - name: line + type: long + count: 1 + description: > + Source code line number for the top stack frame. + + - name: stack + type: group + dynamic: false + fields: + - name: id + type: keyword + description: > + Unique ID for a stack frame in the context of its callers. + - name: function + type: keyword + description: > + Function name for a stack frame. + - name: filename + type: keyword + description: > + Source code filename for a stack frame. + - name: line + type: long + description: > + Source code line number for a stack frame. diff --git a/model/profile/profile.go b/model/profile/profile.go new file mode 100644 index 00000000000..85e4f5928b5 --- /dev/null +++ b/model/profile/profile.go @@ -0,0 +1,129 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package profile + +import ( + "fmt" + "hash/fnv" + "time" + + "github.com/google/pprof/profile" + + "github.com/elastic/apm-server/transform" + "github.com/elastic/apm-server/utility" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +const ( + processorName = "profile" + profileDocType = "profile" +) + +var processorEntry = common.MapStr{ + "name": processorName, + "event": profileDocType, +} + +// Profile represents a complete profile. +type PprofProfile struct { + Profile *profile.Profile +} + +// Transforms transforms a Profile into a sequence of beat.Events: one per profile sample. +func (pp PprofProfile) Transform(tctx *transform.Context) []beat.Event { + // Precompute value field names for use in each event. + // TODO(axw) limit to well-known value names? + profileTimestamp := time.Unix(0, pp.Profile.TimeNanos) + valueFieldNames := make([]string, len(pp.Profile.SampleType)) + for i, sampleType := range pp.Profile.SampleType { + sampleUnit := normalizeUnit(sampleType.Unit) + valueFieldNames[i] = sampleType.Type + "." + sampleUnit + } + + samples := make([]beat.Event, len(pp.Profile.Sample)) + for i, sample := range pp.Profile.Sample { + profileFields := common.MapStr{} + if pp.Profile.DurationNanos > 0 { + profileFields["duration"] = pp.Profile.DurationNanos + } + if len(sample.Location) > 0 { + hash := fnv.New64a() + stack := make([]common.MapStr, len(sample.Location)) + for i := len(sample.Location) - 1; i >= 0; i-- { + loc := sample.Location[i] + line := loc.Line[0] // aggregated at function level + + // NOTE(axw) Currently we hash the function names so that + // we can aggregate stacks across multiple builds, or where + // binaries are not reproducible. + // + // If we decide to identify stack traces and frames using + // function addresses, then need to subtract the mapping's + // start address to eliminate the effects of ASLR, i.e. + // + // var buf [8]byte + // binary.BigEndian.PutUint64(buf[:], loc.Address-loc.Mapping.Start) + // hash.Write(buf[:]) + + hash.Write([]byte(line.Function.Name)) + fields := common.MapStr{ + "id": fmt.Sprintf("%x", hash.Sum(nil)), + "function": line.Function.Name, + } + if line.Function.Filename != "" { + utility.Set(fields, "filename", line.Function.Filename) + if line.Line > 0 { + utility.Set(fields, "line", line.Line) + } + } + stack[i] = fields + } + utility.Set(profileFields, "stack", stack) + utility.Set(profileFields, "top", stack[0]) + } + for i, v := range sample.Value { + utility.Set(profileFields, valueFieldNames[i], v) + } + event := beat.Event{ + Timestamp: profileTimestamp, + Fields: common.MapStr{ + "processor": processorEntry, + profileDocType: profileFields, + }, + } + tctx.Metadata.Set(event.Fields) + if len(sample.Label) > 0 { + labels := make(common.MapStr) + for k, v := range sample.Label { + utility.Set(labels, k, v) + } + utility.DeepUpdate(event.Fields, "labels", labels) + } + samples[i] = event + } + return samples +} + +func normalizeUnit(unit string) string { + switch unit { + case "nanoseconds": + unit = "ns" + } + return unit +} diff --git a/model/profile/profile_test.go b/model/profile/profile_test.go new file mode 100644 index 00000000000..34cfa11df87 --- /dev/null +++ b/model/profile/profile_test.go @@ -0,0 +1,132 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package profile_test + +import ( + "testing" + "time" + + pprof_profile "github.com/google/pprof/profile" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/apm-server/model/metadata" + "github.com/elastic/apm-server/model/profile" + "github.com/elastic/apm-server/sourcemap" + "github.com/elastic/apm-server/transform" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestPprofProfileTransform(t *testing.T) { + timestamp := time.Unix(123, 456) + pp := profile.PprofProfile{ + Profile: &pprof_profile.Profile{ + TimeNanos: timestamp.UnixNano(), + DurationNanos: int64(10 * time.Second), + SampleType: []*pprof_profile.ValueType{ + {Type: "cpu", Unit: "nanoseconds"}, + {Type: "inuse_space", Unit: "bytes"}, + }, + Sample: []*pprof_profile.Sample{{ + Value: []int64{123, 456}, + Label: map[string][]string{ + "key1": []string{"abc", "def"}, + "key2": []string{"ghi"}, + }, + Location: []*pprof_profile.Location{{ + Line: []pprof_profile.Line{{ + Function: &pprof_profile.Function{Name: "foo", Filename: "foo.go"}, + Line: 1, + }}, + }, { + Line: []pprof_profile.Line{{ + Function: &pprof_profile.Function{Name: "bar", Filename: "bar.go"}, + }}, + }}, + }, { + Value: []int64{123, 456}, + Label: map[string][]string{ + "key1": []string{"abc", "def"}, + "key2": []string{"ghi"}, + }, + Location: []*pprof_profile.Location{{ + Line: []pprof_profile.Line{{ + Function: &pprof_profile.Function{Name: "foo", Filename: "foo.go"}, + Line: 1, + }}, + }, { + Line: []pprof_profile.Line{{ + Function: &pprof_profile.Function{Name: "bar", Filename: "bar.go"}, + }}, + }}, + }}, + }, + } + + serviceName, env := "myService", "staging" + service := metadata.Service{ + Name: &serviceName, + Environment: &env, + } + metadata := metadata.Metadata{Service: &service} + + tctx := &transform.Context{ + Config: transform.Config{SourcemapMapper: &sourcemap.SmapMapper{}}, + Metadata: metadata, + RequestTime: time.Time{}, // not used + } + output := pp.Transform(tctx) + require.Len(t, output, 2) + assert.Equal(t, output[0], output[1]) + assert.Equal(t, beat.Event{ + Timestamp: timestamp, + Fields: common.MapStr{ + "processor": common.MapStr{"event": "profile", "name": "profile"}, + "service": common.MapStr{ + "name": "myService", + "environment": "staging", + }, + "labels": common.MapStr{ + "key1": []string{"abc", "def"}, + "key2": []string{"ghi"}, + }, + "profile": common.MapStr{ + "duration": int64(10 * time.Second), + "cpu.ns": int64(123), + "inuse_space.bytes": int64(456), + "top": common.MapStr{ + "function": "foo", + "filename": "foo.go", + "line": int64(1), + "id": "3c12369b3586048a", + }, + "stack": []common.MapStr{{ + "function": "foo", + "filename": "foo.go", + "line": int64(1), + "id": "3c12369b3586048a", + }, { + "function": "bar", + "filename": "bar.go", + "id": "003934191339461a", + }}, + }, + }, + }, output[0]) +}