Skip to content

Commit

Permalink
beater: even more refactoring (#5502)
Browse files Browse the repository at this point in the history
* beater: even more refactoring

- rate limiting middleware is now installed for
  both RUM and backend agent APIs, but only applies
  for anonymous clients (currently only RUM)
- rate limiting middleware now performs an initial
  Allow check at the request level, for consistent
  request rate limiting of those endpoints that are
  rate limited
- agent config now restricts "insecure" (RUM) agents
  on the basis that they are anonymous, rather than
  being RUM specifically. The list of insecure agent
  names (those allowed for anonymous auth) is now
  passed in

* make gofmt

* beater/api/profile: remove unused field
  • Loading branch information
axw authored Jun 21, 2021
1 parent ab082c3 commit d577ec8
Show file tree
Hide file tree
Showing 23 changed files with 403 additions and 378 deletions.
63 changes: 30 additions & 33 deletions beater/api/config/agent/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,21 @@ var (
registry = monitoring.Default.NewRegistry("apm-server.acm")

errCacheControl = fmt.Sprintf("max-age=%v, must-revalidate", errMaxAgeDuration.Seconds())

// rumAgents keywords (new and old)
rumAgents = []string{"rum-js", "js-base"}
)

type handler struct {
f agentcfg.Fetcher

allowAnonymousAgents []string
cacheControl, defaultServiceEnvironment string
}

func NewHandler(f agentcfg.Fetcher, config config.KibanaAgentConfig, defaultServiceEnvironment string) request.Handler {
func NewHandler(
f agentcfg.Fetcher,
config config.KibanaAgentConfig,
defaultServiceEnvironment string,
allowAnonymousAgents []string,
) request.Handler {
if f == nil {
panic("fetcher must not be nil")
}
Expand All @@ -72,6 +75,7 @@ func NewHandler(f agentcfg.Fetcher, config config.KibanaAgentConfig, defaultServ
f: f,
cacheControl: cacheControl,
defaultServiceEnvironment: defaultServiceEnvironment,
allowAnonymousAgents: allowAnonymousAgents,
}

return h.Handle
Expand All @@ -83,13 +87,6 @@ func (h *handler) Handle(c *request.Context) {
// error handling
c.Header().Set(headers.CacheControl, errCacheControl)

ok := c.RateLimiter == nil || c.RateLimiter.Allow()
if !ok {
c.Result.SetDefault(request.IDResponseErrorsRateLimit)
c.Write()
return
}

query, queryErr := buildQuery(c)
if queryErr != nil {
extractQueryError(c, queryErr)
Expand All @@ -100,26 +97,29 @@ func (h *handler) Handle(c *request.Context) {
query.Service.Environment = h.defaultServiceEnvironment
}

if !c.AuthResult.Anonymous {
// The exact agent is not always known for anonymous clients, so we do not
// issue a secondary authorization check for them. Instead, we issue the
// request and filter the results using query.InsecureAgents.
authResource := authorization.Resource{ServiceName: query.Service.Name}
if result, err := authorization.AuthorizedFor(c.Request.Context(), authResource); err != nil {
c.Result.SetDefault(request.IDResponseErrorsServiceUnavailable)
c.Result.Err = err
c.Write()
return
} else if !result.Authorized {
id := request.IDResponseErrorsUnauthorized
status := request.MapResultIDToStatus[id]
if result.Reason != "" {
status.Keyword = result.Reason
}
c.Result.Set(id, status.Code, status.Keyword, nil, nil)
c.Write()
return
// Only service, and not agent, is known for config queries.
// For anonymous/untrusted agents, we filter the results using
// query.InsecureAgents below.
authResource := authorization.Resource{ServiceName: query.Service.Name}
authResult, err := authorization.AuthorizedFor(c.Request.Context(), authResource)
if err != nil {
c.Result.SetDefault(request.IDResponseErrorsServiceUnavailable)
c.Result.Err = err
c.Write()
return
}
if !authResult.Authorized {
id := request.IDResponseErrorsUnauthorized
status := request.MapResultIDToStatus[id]
if authResult.Reason != "" {
status.Keyword = authResult.Reason
}
c.Result.Set(id, status.Code, status.Keyword, nil, nil)
c.Write()
return
}
if authResult.Anonymous {
query.InsecureAgents = h.allowAnonymousAgents
}

result, err := h.f.Fetch(c.Request.Context(), query)
Expand Down Expand Up @@ -184,9 +184,6 @@ func buildQuery(c *request.Context) (agentcfg.Query, error) {
return query, errors.New(agentcfg.ServiceName + " is required")
}

if c.IsRum {
query.InsecureAgents = rumAgents
}
query.Etag = ifNoneMatch(c)
return query, nil
}
Expand Down
89 changes: 41 additions & 48 deletions beater/api/config/agent/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.elastic.co/apm/apmtest"
"golang.org/x/time/rate"

"github.com/elastic/beats/v7/libbeat/common"
libkibana "github.com/elastic/beats/v7/libbeat/kibana"
Expand Down Expand Up @@ -173,18 +172,12 @@ func TestAgentConfigHandler(t *testing.T) {
var cfg = config.KibanaAgentConfig{Cache: config.Cache{Expiration: 4 * time.Second}}
for _, tc := range testcases {
f := agentcfg.NewKibanaFetcher(tc.kbClient, cfg.Cache.Expiration)
h := NewHandler(f, cfg, "")
h := NewHandler(f, cfg, "", nil)
r := httptest.NewRequest(tc.method, target(tc.queryParams), nil)
for k, v := range tc.requestHeader {
r.Header.Set(k, v)
}
ctx, w := newRequestContext(r)
ctx.AuthResult.Authorized = true
ctx.Request = withAuthorization(ctx.Request,
authorizedForFunc(func(context.Context, authorization.Resource) (authorization.Result, error) {
return authorization.Result{Authorized: true}, nil
}),
)
h(ctx)

require.Equal(t, tc.respStatus, w.Code)
Expand All @@ -202,32 +195,46 @@ func TestAgentConfigHandlerAnonymousAccess(t *testing.T) {
kbClient := kibanatest.MockKibana(http.StatusUnauthorized, m{"error": "Unauthorized"}, mockVersion, true)
cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}}
f := agentcfg.NewKibanaFetcher(kbClient, cfg.Cache.Expiration)
h := NewHandler(f, cfg, "")
h := NewHandler(f, cfg, "", nil)

for _, tc := range []struct {
anonymous bool
response string
anonymous bool
response string
authResource *authorization.Resource
}{{
anonymous: false,
response: `{"error":"APM Server is not authorized to query Kibana. Please configure apm-server.kibana.username and apm-server.kibana.password, and ensure the user has the necessary privileges."}`,
anonymous: false,
response: `{"error":"APM Server is not authorized to query Kibana. Please configure apm-server.kibana.username and apm-server.kibana.password, and ensure the user has the necessary privileges."}`,
authResource: &authorization.Resource{ServiceName: "opbeans"},
}, {
anonymous: true,
response: `{"error":"Unauthorized"}`,
anonymous: true,
response: `{"error":"Unauthorized"}`,
authResource: &authorization.Resource{ServiceName: "opbeans"},
}} {
r := httptest.NewRequest(http.MethodGet, target(map[string]string{"service.name": "opbeans"}), nil)
ctx, w := newRequestContext(r)
ctx.AuthResult.Authorized = true
ctx.AuthResult.Anonymous = tc.anonymous
ctx.Request = withAuthorization(ctx.Request, authorization.AnonymousAuth{})
h(ctx)
c, w := newRequestContext(r)
c.AuthResult.Authorized = true
c.AuthResult.Anonymous = tc.anonymous

var requestedResource *authorization.Resource
c.Request = withAuthorization(c.Request,
authorizedForFunc(func(ctx context.Context, resource authorization.Resource) (authorization.Result, error) {
if requestedResource != nil {
panic("expected only one AuthorizedFor request")
}
requestedResource = &resource
return c.AuthResult, nil
}),
)
h(c)
assert.Equal(t, tc.response+"\n", w.Body.String())
assert.Equal(t, tc.authResource, requestedResource)
}
}

func TestAgentConfigHandlerAuthorizedForService(t *testing.T) {
cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}}
f := agentcfg.NewKibanaFetcher(nil, cfg.Cache.Expiration)
h := NewHandler(f, cfg, "")
h := NewHandler(f, cfg, "", nil)

r := httptest.NewRequest(http.MethodGet, target(map[string]string{"service.name": "opbeans"}), nil)
ctx, w := newRequestContext(r)
Expand All @@ -249,7 +256,7 @@ func TestAgentConfigHandlerAuthorizedForService(t *testing.T) {
func TestAgentConfigHandler_NoKibanaClient(t *testing.T) {
cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}}
f := agentcfg.NewKibanaFetcher(nil, cfg.Cache.Expiration)
h := NewHandler(f, cfg, "")
h := NewHandler(f, cfg, "", nil)

w := sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{
"service": m{"name": "opbeans-node"}})))
Expand All @@ -268,7 +275,7 @@ func TestAgentConfigHandler_PostOk(t *testing.T) {

var cfg = config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}}
f := agentcfg.NewKibanaFetcher(kb, cfg.Cache.Expiration)
h := NewHandler(f, cfg, "")
h := NewHandler(f, cfg, "", nil)

w := sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{
"service": m{"name": "opbeans-node"}})))
Expand All @@ -289,7 +296,7 @@ func TestAgentConfigHandler_DefaultServiceEnvironment(t *testing.T) {

var cfg = config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}}
f := agentcfg.NewKibanaFetcher(kb, cfg.Cache.Expiration)
h := NewHandler(f, cfg, "default")
h := NewHandler(f, cfg, "default", nil)

sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{"service": m{"name": "opbeans-node", "environment": "specified"}})))
sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{"service": m{"name": "opbeans-node"}})))
Expand All @@ -306,8 +313,6 @@ func TestAgentConfigRum(t *testing.T) {
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx, w := newRequestContext(r)
ctx.IsRum = true
ctx.AuthResult.Anonymous = true
h(ctx)
var actual map[string]string
json.Unmarshal(w.Body.Bytes(), &actual)
Expand All @@ -320,8 +325,6 @@ func TestAgentConfigRumEtag(t *testing.T) {
h := getHandler("rum-js")
r := httptest.NewRequest(http.MethodGet, "/rum?ifnonematch=123&service.name=opbeans", nil)
ctx, w := newRequestContext(r)
ctx.IsRum = true
ctx.AuthResult.Anonymous = true
h(ctx)
assert.Equal(t, http.StatusNotModified, w.Code, w.Body.String())
}
Expand All @@ -333,7 +336,7 @@ func TestAgentConfigNotRum(t *testing.T) {
ctx, w := newRequestContext(r)
ctx.Request = withAuthorization(ctx.Request,
authorizedForFunc(func(context.Context, authorization.Resource) (authorization.Result, error) {
return authorization.Result{Authorized: true}, nil
return authorization.Result{Authorized: true, Anonymous: false}, nil
}),
)
h(ctx)
Expand All @@ -348,30 +351,13 @@ func TestAgentConfigNoLeak(t *testing.T) {
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx, w := newRequestContext(r)
ctx.IsRum = true
ctx.AuthResult.Anonymous = true
h(ctx)
var actual map[string]string
json.Unmarshal(w.Body.Bytes(), &actual)
assert.Equal(t, http.StatusOK, w.Code, w.Body.String())
assert.Equal(t, map[string]string{}, actual)
}

func TestAgentConfigRateLimit(t *testing.T) {
h := getHandler("rum-js")
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx, w := newRequestContext(r)
ctx.IsRum = true
ctx.RateLimiter = rate.NewLimiter(rate.Limit(0), 0)
ctx.AuthResult.Anonymous = true
h(ctx)
var actual map[string]string
json.Unmarshal(w.Body.Bytes(), &actual)
assert.Equal(t, http.StatusTooManyRequests, w.Code, w.Body.String())
assert.Equal(t, map[string]string{"error": "too many requests"}, actual)
}

func getHandler(agent string) request.Handler {
kb := kibanatest.MockKibana(http.StatusOK, m{
"_id": "1",
Expand All @@ -386,7 +372,7 @@ func getHandler(agent string) request.Handler {
}, mockVersion, true)
cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}}
f := agentcfg.NewKibanaFetcher(kb, cfg.Cache.Expiration)
return NewHandler(f, cfg, "")
return NewHandler(f, cfg, "", []string{"rum-js"})
}

func TestIfNoneMatch(t *testing.T) {
Expand All @@ -412,7 +398,7 @@ func TestAgentConfigTraceContext(t *testing.T) {
client := kibana.NewConnectingClient(&kibanaCfg)
cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: 5 * time.Minute}}
f := agentcfg.NewKibanaFetcher(client, cfg.Cache.Expiration)
handler := NewHandler(f, cfg, "default")
handler := NewHandler(f, cfg, "default", nil)
_, spans, _ := apmtest.WithTransaction(func(ctx context.Context) {
// When the handler is called with a context containing
// a transaction, the underlying Kibana query should create a span
Expand All @@ -439,6 +425,7 @@ func newRequestContext(r *http.Request) (*request.Context, *httptest.ResponseRec
w := httptest.NewRecorder()
ctx := request.NewContext()
ctx.Reset(w, r)
ctx.Request = withAnonymousAuthorization(ctx.Request)
return ctx, w
}

Expand Down Expand Up @@ -471,6 +458,12 @@ func (c *recordingKibanaClient) Send(ctx context.Context, method string, path st
return c.Client.Send(ctx, method, path, params, header, body)
}

func withAnonymousAuthorization(req *http.Request) *http.Request {
return withAuthorization(req, authorizedForFunc(func(context.Context, authorization.Resource) (authorization.Result, error) {
return authorization.Result{Authorized: true, Anonymous: true}, nil
}))
}

func withAuthorization(req *http.Request, auth authorization.Authorization) *http.Request {
return req.WithContext(authorization.ContextWithAuthorization(req.Context(), auth))
}
Expand Down
Loading

0 comments on commit d577ec8

Please sign in to comment.