Skip to content

Commit

Permalink
Merge branch 'master' into java-attacher-config
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartnelson3 committed Jun 22, 2021
2 parents 4834a7c + 9f7d9b6 commit b21a92a
Show file tree
Hide file tree
Showing 40 changed files with 710 additions and 842 deletions.
63 changes: 30 additions & 33 deletions beater/api/config/agent/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,21 @@ var (
registry = monitoring.Default.NewRegistry("apm-server.acm")

errCacheControl = fmt.Sprintf("max-age=%v, must-revalidate", errMaxAgeDuration.Seconds())

// rumAgents keywords (new and old)
rumAgents = []string{"rum-js", "js-base"}
)

type handler struct {
f agentcfg.Fetcher

allowAnonymousAgents []string
cacheControl, defaultServiceEnvironment string
}

func NewHandler(f agentcfg.Fetcher, config config.KibanaAgentConfig, defaultServiceEnvironment string) request.Handler {
func NewHandler(
f agentcfg.Fetcher,
config config.KibanaAgentConfig,
defaultServiceEnvironment string,
allowAnonymousAgents []string,
) request.Handler {
if f == nil {
panic("fetcher must not be nil")
}
Expand All @@ -72,6 +75,7 @@ func NewHandler(f agentcfg.Fetcher, config config.KibanaAgentConfig, defaultServ
f: f,
cacheControl: cacheControl,
defaultServiceEnvironment: defaultServiceEnvironment,
allowAnonymousAgents: allowAnonymousAgents,
}

return h.Handle
Expand All @@ -83,13 +87,6 @@ func (h *handler) Handle(c *request.Context) {
// error handling
c.Header().Set(headers.CacheControl, errCacheControl)

ok := c.RateLimiter == nil || c.RateLimiter.Allow()
if !ok {
c.Result.SetDefault(request.IDResponseErrorsRateLimit)
c.Write()
return
}

query, queryErr := buildQuery(c)
if queryErr != nil {
extractQueryError(c, queryErr)
Expand All @@ -100,26 +97,29 @@ func (h *handler) Handle(c *request.Context) {
query.Service.Environment = h.defaultServiceEnvironment
}

if !c.AuthResult.Anonymous {
// The exact agent is not always known for anonymous clients, so we do not
// issue a secondary authorization check for them. Instead, we issue the
// request and filter the results using query.InsecureAgents.
authResource := authorization.Resource{ServiceName: query.Service.Name}
if result, err := authorization.AuthorizedFor(c.Request.Context(), authResource); err != nil {
c.Result.SetDefault(request.IDResponseErrorsServiceUnavailable)
c.Result.Err = err
c.Write()
return
} else if !result.Authorized {
id := request.IDResponseErrorsUnauthorized
status := request.MapResultIDToStatus[id]
if result.Reason != "" {
status.Keyword = result.Reason
}
c.Result.Set(id, status.Code, status.Keyword, nil, nil)
c.Write()
return
// Only service, and not agent, is known for config queries.
// For anonymous/untrusted agents, we filter the results using
// query.InsecureAgents below.
authResource := authorization.Resource{ServiceName: query.Service.Name}
authResult, err := authorization.AuthorizedFor(c.Request.Context(), authResource)
if err != nil {
c.Result.SetDefault(request.IDResponseErrorsServiceUnavailable)
c.Result.Err = err
c.Write()
return
}
if !authResult.Authorized {
id := request.IDResponseErrorsUnauthorized
status := request.MapResultIDToStatus[id]
if authResult.Reason != "" {
status.Keyword = authResult.Reason
}
c.Result.Set(id, status.Code, status.Keyword, nil, nil)
c.Write()
return
}
if authResult.Anonymous {
query.InsecureAgents = h.allowAnonymousAgents
}

result, err := h.f.Fetch(c.Request.Context(), query)
Expand Down Expand Up @@ -184,9 +184,6 @@ func buildQuery(c *request.Context) (agentcfg.Query, error) {
return query, errors.New(agentcfg.ServiceName + " is required")
}

if c.IsRum {
query.InsecureAgents = rumAgents
}
query.Etag = ifNoneMatch(c)
return query, nil
}
Expand Down
89 changes: 41 additions & 48 deletions beater/api/config/agent/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.elastic.co/apm/apmtest"
"golang.org/x/time/rate"

"github.com/elastic/beats/v7/libbeat/common"
libkibana "github.com/elastic/beats/v7/libbeat/kibana"
Expand Down Expand Up @@ -173,18 +172,12 @@ func TestAgentConfigHandler(t *testing.T) {
var cfg = config.KibanaAgentConfig{Cache: config.Cache{Expiration: 4 * time.Second}}
for _, tc := range testcases {
f := agentcfg.NewKibanaFetcher(tc.kbClient, cfg.Cache.Expiration)
h := NewHandler(f, cfg, "")
h := NewHandler(f, cfg, "", nil)
r := httptest.NewRequest(tc.method, target(tc.queryParams), nil)
for k, v := range tc.requestHeader {
r.Header.Set(k, v)
}
ctx, w := newRequestContext(r)
ctx.AuthResult.Authorized = true
ctx.Request = withAuthorization(ctx.Request,
authorizedForFunc(func(context.Context, authorization.Resource) (authorization.Result, error) {
return authorization.Result{Authorized: true}, nil
}),
)
h(ctx)

require.Equal(t, tc.respStatus, w.Code)
Expand All @@ -202,32 +195,46 @@ func TestAgentConfigHandlerAnonymousAccess(t *testing.T) {
kbClient := kibanatest.MockKibana(http.StatusUnauthorized, m{"error": "Unauthorized"}, mockVersion, true)
cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}}
f := agentcfg.NewKibanaFetcher(kbClient, cfg.Cache.Expiration)
h := NewHandler(f, cfg, "")
h := NewHandler(f, cfg, "", nil)

for _, tc := range []struct {
anonymous bool
response string
anonymous bool
response string
authResource *authorization.Resource
}{{
anonymous: false,
response: `{"error":"APM Server is not authorized to query Kibana. Please configure apm-server.kibana.username and apm-server.kibana.password, and ensure the user has the necessary privileges."}`,
anonymous: false,
response: `{"error":"APM Server is not authorized to query Kibana. Please configure apm-server.kibana.username and apm-server.kibana.password, and ensure the user has the necessary privileges."}`,
authResource: &authorization.Resource{ServiceName: "opbeans"},
}, {
anonymous: true,
response: `{"error":"Unauthorized"}`,
anonymous: true,
response: `{"error":"Unauthorized"}`,
authResource: &authorization.Resource{ServiceName: "opbeans"},
}} {
r := httptest.NewRequest(http.MethodGet, target(map[string]string{"service.name": "opbeans"}), nil)
ctx, w := newRequestContext(r)
ctx.AuthResult.Authorized = true
ctx.AuthResult.Anonymous = tc.anonymous
ctx.Request = withAuthorization(ctx.Request, authorization.AnonymousAuth{})
h(ctx)
c, w := newRequestContext(r)
c.AuthResult.Authorized = true
c.AuthResult.Anonymous = tc.anonymous

var requestedResource *authorization.Resource
c.Request = withAuthorization(c.Request,
authorizedForFunc(func(ctx context.Context, resource authorization.Resource) (authorization.Result, error) {
if requestedResource != nil {
panic("expected only one AuthorizedFor request")
}
requestedResource = &resource
return c.AuthResult, nil
}),
)
h(c)
assert.Equal(t, tc.response+"\n", w.Body.String())
assert.Equal(t, tc.authResource, requestedResource)
}
}

func TestAgentConfigHandlerAuthorizedForService(t *testing.T) {
cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}}
f := agentcfg.NewKibanaFetcher(nil, cfg.Cache.Expiration)
h := NewHandler(f, cfg, "")
h := NewHandler(f, cfg, "", nil)

r := httptest.NewRequest(http.MethodGet, target(map[string]string{"service.name": "opbeans"}), nil)
ctx, w := newRequestContext(r)
Expand All @@ -249,7 +256,7 @@ func TestAgentConfigHandlerAuthorizedForService(t *testing.T) {
func TestAgentConfigHandler_NoKibanaClient(t *testing.T) {
cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}}
f := agentcfg.NewKibanaFetcher(nil, cfg.Cache.Expiration)
h := NewHandler(f, cfg, "")
h := NewHandler(f, cfg, "", nil)

w := sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{
"service": m{"name": "opbeans-node"}})))
Expand All @@ -268,7 +275,7 @@ func TestAgentConfigHandler_PostOk(t *testing.T) {

var cfg = config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}}
f := agentcfg.NewKibanaFetcher(kb, cfg.Cache.Expiration)
h := NewHandler(f, cfg, "")
h := NewHandler(f, cfg, "", nil)

w := sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{
"service": m{"name": "opbeans-node"}})))
Expand All @@ -289,7 +296,7 @@ func TestAgentConfigHandler_DefaultServiceEnvironment(t *testing.T) {

var cfg = config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}}
f := agentcfg.NewKibanaFetcher(kb, cfg.Cache.Expiration)
h := NewHandler(f, cfg, "default")
h := NewHandler(f, cfg, "default", nil)

sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{"service": m{"name": "opbeans-node", "environment": "specified"}})))
sendRequest(h, httptest.NewRequest(http.MethodPost, "/config", convert.ToReader(m{"service": m{"name": "opbeans-node"}})))
Expand All @@ -306,8 +313,6 @@ func TestAgentConfigRum(t *testing.T) {
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx, w := newRequestContext(r)
ctx.IsRum = true
ctx.AuthResult.Anonymous = true
h(ctx)
var actual map[string]string
json.Unmarshal(w.Body.Bytes(), &actual)
Expand All @@ -320,8 +325,6 @@ func TestAgentConfigRumEtag(t *testing.T) {
h := getHandler("rum-js")
r := httptest.NewRequest(http.MethodGet, "/rum?ifnonematch=123&service.name=opbeans", nil)
ctx, w := newRequestContext(r)
ctx.IsRum = true
ctx.AuthResult.Anonymous = true
h(ctx)
assert.Equal(t, http.StatusNotModified, w.Code, w.Body.String())
}
Expand All @@ -333,7 +336,7 @@ func TestAgentConfigNotRum(t *testing.T) {
ctx, w := newRequestContext(r)
ctx.Request = withAuthorization(ctx.Request,
authorizedForFunc(func(context.Context, authorization.Resource) (authorization.Result, error) {
return authorization.Result{Authorized: true}, nil
return authorization.Result{Authorized: true, Anonymous: false}, nil
}),
)
h(ctx)
Expand All @@ -348,30 +351,13 @@ func TestAgentConfigNoLeak(t *testing.T) {
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx, w := newRequestContext(r)
ctx.IsRum = true
ctx.AuthResult.Anonymous = true
h(ctx)
var actual map[string]string
json.Unmarshal(w.Body.Bytes(), &actual)
assert.Equal(t, http.StatusOK, w.Code, w.Body.String())
assert.Equal(t, map[string]string{}, actual)
}

func TestAgentConfigRateLimit(t *testing.T) {
h := getHandler("rum-js")
r := httptest.NewRequest(http.MethodPost, "/rum", convert.ToReader(m{
"service": m{"name": "opbeans"}}))
ctx, w := newRequestContext(r)
ctx.IsRum = true
ctx.RateLimiter = rate.NewLimiter(rate.Limit(0), 0)
ctx.AuthResult.Anonymous = true
h(ctx)
var actual map[string]string
json.Unmarshal(w.Body.Bytes(), &actual)
assert.Equal(t, http.StatusTooManyRequests, w.Code, w.Body.String())
assert.Equal(t, map[string]string{"error": "too many requests"}, actual)
}

func getHandler(agent string) request.Handler {
kb := kibanatest.MockKibana(http.StatusOK, m{
"_id": "1",
Expand All @@ -386,7 +372,7 @@ func getHandler(agent string) request.Handler {
}, mockVersion, true)
cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}}
f := agentcfg.NewKibanaFetcher(kb, cfg.Cache.Expiration)
return NewHandler(f, cfg, "")
return NewHandler(f, cfg, "", []string{"rum-js"})
}

func TestIfNoneMatch(t *testing.T) {
Expand All @@ -412,7 +398,7 @@ func TestAgentConfigTraceContext(t *testing.T) {
client := kibana.NewConnectingClient(&kibanaCfg)
cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: 5 * time.Minute}}
f := agentcfg.NewKibanaFetcher(client, cfg.Cache.Expiration)
handler := NewHandler(f, cfg, "default")
handler := NewHandler(f, cfg, "default", nil)
_, spans, _ := apmtest.WithTransaction(func(ctx context.Context) {
// When the handler is called with a context containing
// a transaction, the underlying Kibana query should create a span
Expand All @@ -439,6 +425,7 @@ func newRequestContext(r *http.Request) (*request.Context, *httptest.ResponseRec
w := httptest.NewRecorder()
ctx := request.NewContext()
ctx.Reset(w, r)
ctx.Request = withAnonymousAuthorization(ctx.Request)
return ctx, w
}

Expand Down Expand Up @@ -471,6 +458,12 @@ func (c *recordingKibanaClient) Send(ctx context.Context, method string, path st
return c.Client.Send(ctx, method, path, params, header, body)
}

func withAnonymousAuthorization(req *http.Request) *http.Request {
return withAuthorization(req, authorizedForFunc(func(context.Context, authorization.Resource) (authorization.Result, error) {
return authorization.Result{Authorized: true, Anonymous: true}, nil
}))
}

func withAuthorization(req *http.Request, auth authorization.Authorization) *http.Request {
return req.WithContext(authorization.ContextWithAuthorization(req.Context(), auth))
}
Expand Down
Loading

0 comments on commit b21a92a

Please sign in to comment.