Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[elastic-agent] proxy requests to subprocesses to their metrics endpoints #28165

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions x-pack/elastic-agent/pkg/core/monitoring/server/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,14 @@ func processHandler(statsHandler func(http.ResponseWriter, *http.Request) error)
// proxy stats for elastic agent process
return statsHandler(w, r)
}
// TODO: allowlist of accepted endpoints to proxy to?
beatsEndpoint := vars["beatsEndpoint"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if subprocesses will mostly be beats, so we can limit the allowed paths to /, /state, and /stats, or how you want to handle this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely add allowlist here, including some proper escaping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what escaping are you looking for? I added just a direct match check for /, /state, and /stats, so I'm not sure what to escape


metricsBytes, statusCode, metricsErr := processMetrics(r.Context(), id)
endpoint, err := generateEndpoint(id)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled this out so I could inject my own endpoint into processMetrics, to make testing easier.

if err != nil {
return err
}
metricsBytes, statusCode, metricsErr := processMetrics(r.Context(), endpoint, beatsEndpoint)
if metricsErr != nil {
return metricsErr
}
Expand All @@ -82,23 +88,8 @@ func processHandler(statsHandler func(http.ResponseWriter, *http.Request) error)
}
}

func processMetrics(ctx context.Context, id string) ([]byte, int, error) {
detail, err := parseID(id)
if err != nil {
return nil, 0, err
}

endpoint := beats.MonitoringEndpoint(detail.spec, artifact.DefaultConfig().OS(), detail.output)
if !strings.HasPrefix(endpoint, httpPlusPrefix) && !strings.HasPrefix(endpoint, "http") {
// add prefix for npipe and unix
endpoint = httpPlusPrefix + endpoint
}

if detail.isMonitoring {
endpoint += "_monitor"
}

hostData, err := parse.ParseURL(endpoint, "http", "", "", "stats", "")
func processMetrics(ctx context.Context, endpoint, path string) ([]byte, int, error) {
hostData, err := parse.ParseURL(endpoint, "http", "", "", path, "")
if err != nil {
return nil, 0, errorWithStatus(http.StatusInternalServerError, err)
}
Expand Down Expand Up @@ -145,6 +136,24 @@ func processMetrics(ctx context.Context, id string) ([]byte, int, error) {
return rb, resp.StatusCode, nil
}

func generateEndpoint(id string) (string, error) {
detail, err := parseID(id)
if err != nil {
return "", err
}

endpoint := beats.MonitoringEndpoint(detail.spec, artifact.DefaultConfig().OS(), detail.output)
if !strings.HasPrefix(endpoint, httpPlusPrefix) && !strings.HasPrefix(endpoint, "http") {
// add prefix for npipe and unix
endpoint = httpPlusPrefix + endpoint
}

if detail.isMonitoring {
endpoint += "_monitor"
}
return endpoint, nil
}

func writeResponse(w http.ResponseWriter, c interface{}) {
bytes, err := json.Marshal(c)
if err != nil {
Expand Down
31 changes: 31 additions & 0 deletions x-pack/elastic-agent/pkg/core/monitoring/server/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
package server

import (
"context"
"errors"
"net"
"net/http"
"net/http/httptest"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -87,3 +92,29 @@ func TestStatusErr(t *testing.T) {
})
}
}

func TestProcessProxyRequest(t *testing.T) {
sock := "/tmp/elastic-agent-test.sock"
defer os.Remove(sock)

endpoint := "http+unix://" + sock
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Write the path to the client so they can verify the request
// was correct
w.Write([]byte(r.URL.Path))
}))

// Mimic subprocesses and listen on a unix socket
l, err := net.Listen("unix", sock)
require.NoError(t, err)
server.Listener = l
server.Start()
defer server.Close()

for _, path := range []string{"/stats", "/", "/state"} {
respBytes, _, err := processMetrics(context.Background(), endpoint, path)
require.NoError(t, err)
// Verify that the server saw the path we tried to request
assert.Equal(t, path, string(respBytes))
}
}
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/pkg/core/monitoring/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func exposeMetricsEndpoint(log *logger.Logger, config *common.Config, ns func(st
if enableProcessStats {
r.HandleFunc("/processes", processesHandler(routesFetchFn))
r.Handle("/processes/{processID}", createHandler(processHandler(statsHandler)))
r.Handle("/processes/{processID}/", createHandler(processHandler(statsHandler)))
r.Handle("/processes/{processID}/{beatsEndpoint}", createHandler(processHandler(statsHandler)))
}

mux := http.NewServeMux()
Expand Down