From 57732ac955c24540205f20693d115e0819d221c5 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Thu, 10 Jun 2021 18:36:47 +0800 Subject: [PATCH] authorization: introduce anonymous + agent/service-specific auth (#5422) * beater/authorization: introduce Resource type Introduce a Resource type, which describes a specific resource for which authorization is being queried. This can later be used to restrict access to specific agents and services. If the supplied resource is the zero value, then the query is interpreted as checking if the requester has any access at all. If the resource is non-zero, then the query is interpreted as checking if the requester has access to that specific resource (agent/service). * beater/authorization: add context functions * beater/authorization: introduce AnonymousAuth * beater: check authorization for agent+service (cherry picked from commit 05cde22c846915bc71b9902311d93ce2c1558ea8) --- beater/api/config/agent/handler.go | 68 ++++++--- beater/api/config/agent/handler_test.go | 141 ++++++++++++------ beater/api/mux.go | 1 + beater/api/root/handler.go | 2 +- beater/api/root/handler_test.go | 10 ++ beater/authorization/allow.go | 8 +- beater/authorization/allow_test.go | 2 +- beater/authorization/anonymous.go | 30 ++++ beater/authorization/apikey.go | 39 +++-- beater/authorization/apikey_test.go | 117 ++++++++------- beater/authorization/bearer.go | 4 +- beater/authorization/bearer_test.go | 2 +- beater/authorization/builder.go | 34 ++++- beater/authorization/context.go | 53 +++++++ beater/authorization/deny.go | 9 +- beater/authorization/deny_test.go | 2 +- beater/authprocessor.go | 43 ++++++ beater/grpcauth.go | 14 +- beater/http.go | 8 + beater/jaeger/common.go | 17 ++- beater/jaeger/grpc.go | 3 +- beater/jaeger/grpc_test.go | 49 ++++-- beater/middleware/authorization_middleware.go | 14 +- .../authorization_middleware_test.go | 7 +- beater/server.go | 6 + cmd/apikey.go | 4 +- 26 files changed, 498 insertions(+), 189 deletions(-) create mode 100644 beater/authorization/anonymous.go create mode 100644 beater/authorization/context.go create mode 100644 beater/authprocessor.go diff --git a/beater/api/config/agent/handler.go b/beater/api/config/agent/handler.go index 5d83b8a01cf..04086dcc9d2 100644 --- a/beater/api/config/agent/handler.go +++ b/beater/api/config/agent/handler.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/apm-server/agentcfg" + "github.com/elastic/apm-server/beater/authorization" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/beater/headers" "github.com/elastic/apm-server/beater/request" @@ -91,7 +92,7 @@ func (h *handler) Handle(c *request.Context) { query, queryErr := buildQuery(c) if queryErr != nil { - extractQueryError(c, queryErr, c.AuthResult.Authorized) + extractQueryError(c, queryErr) c.Write() return } @@ -99,13 +100,35 @@ func (h *handler) Handle(c *request.Context) { query.Service.Environment = h.defaultServiceEnvironment } + if !c.AuthResult.Anonymous { + // The exact agent is not always known for anonymous clients, so we do not + // issue a secondary authorization check for them. Instead, we issue the + // request and filter the results using query.InsecureAgents. + authResource := authorization.Resource{ServiceName: query.Service.Name} + if result, err := authorization.AuthorizedFor(c.Request.Context(), authResource); err != nil { + c.Result.SetDefault(request.IDResponseErrorsServiceUnavailable) + c.Result.Err = err + c.Write() + return + } else if !result.Authorized { + id := request.IDResponseErrorsUnauthorized + status := request.MapResultIDToStatus[id] + if result.Reason != "" { + status.Keyword = result.Reason + } + c.Result.Set(id, status.Code, status.Keyword, nil, nil) + c.Write() + return + } + } + result, err := h.f.Fetch(c.Request.Context(), query) if err != nil { var verr *agentcfg.ValidationError if errors.As(err, &verr) { body := verr.Body() if strings.HasPrefix(body, agentcfg.ErrMsgKibanaVersionNotCompatible) { - body = authErrMsg(body, agentcfg.ErrMsgKibanaVersionNotCompatible, c.AuthResult.Authorized) + body = authErrMsg(c, body, agentcfg.ErrMsgKibanaVersionNotCompatible) } c.Result.Set( request.IDResponseErrorsServiceUnavailable, @@ -116,7 +139,7 @@ func (h *handler) Handle(c *request.Context) { ) } else { apm.CaptureError(c.Request.Context(), err).Send() - extractInternalError(c, err, c.AuthResult.Authorized) + extractInternalError(c, err) } c.Write() return @@ -135,12 +158,15 @@ func (h *handler) Handle(c *request.Context) { c.Write() } -func buildQuery(c *request.Context) (query agentcfg.Query, err error) { +func buildQuery(c *request.Context) (agentcfg.Query, error) { r := c.Request + var query agentcfg.Query switch r.Method { case http.MethodPost: - err = convert.FromReader(r.Body, &query) + if err := convert.FromReader(r.Body, &query); err != nil { + return query, err + } case http.MethodGet: params := r.URL.Query() query = agentcfg.Query{ @@ -150,41 +176,43 @@ func buildQuery(c *request.Context) (query agentcfg.Query, err error) { }, } default: - err = errors.Errorf("%s: %s", msgMethodUnsupported, r.Method) + if err := errors.Errorf("%s: %s", msgMethodUnsupported, r.Method); err != nil { + return query, err + } } - - if err == nil && query.Service.Name == "" { - err = errors.New(agentcfg.ServiceName + " is required") + if query.Service.Name == "" { + return query, errors.New(agentcfg.ServiceName + " is required") } + if c.IsRum { query.InsecureAgents = rumAgents } query.Etag = ifNoneMatch(c) - return + return query, nil } -func extractInternalError(c *request.Context, err error, withAuth bool) { +func extractInternalError(c *request.Context, err error) { msg := err.Error() var body interface{} var keyword string switch { case strings.Contains(msg, agentcfg.ErrMsgSendToKibanaFailed): - body = authErrMsg(msg, agentcfg.ErrMsgSendToKibanaFailed, withAuth) + body = authErrMsg(c, msg, agentcfg.ErrMsgSendToKibanaFailed) keyword = agentcfg.ErrMsgSendToKibanaFailed case strings.Contains(msg, agentcfg.ErrMsgReadKibanaResponse): - body = authErrMsg(msg, agentcfg.ErrMsgReadKibanaResponse, withAuth) + body = authErrMsg(c, msg, agentcfg.ErrMsgReadKibanaResponse) keyword = agentcfg.ErrMsgReadKibanaResponse case strings.Contains(msg, agentcfg.ErrUnauthorized): fullMsg := "APM Server is not authorized to query Kibana. " + "Please configure apm-server.kibana.username and apm-server.kibana.password, " + "and ensure the user has the necessary privileges." - body = authErrMsg(fullMsg, agentcfg.ErrUnauthorized, withAuth) + body = authErrMsg(c, fullMsg, agentcfg.ErrUnauthorized) keyword = agentcfg.ErrUnauthorized default: - body = authErrMsg(msg, msgServiceUnavailable, withAuth) + body = authErrMsg(c, msg, msgServiceUnavailable) keyword = msgServiceUnavailable } @@ -195,25 +223,25 @@ func extractInternalError(c *request.Context, err error, withAuth bool) { err) } -func extractQueryError(c *request.Context, err error, withAuth bool) { +func extractQueryError(c *request.Context, err error) { msg := err.Error() if strings.Contains(msg, msgMethodUnsupported) { c.Result.Set(request.IDResponseErrorsMethodNotAllowed, http.StatusMethodNotAllowed, msgMethodUnsupported, - authErrMsg(msg, msgMethodUnsupported, withAuth), + authErrMsg(c, msg, msgMethodUnsupported), err) return } c.Result.Set(request.IDResponseErrorsInvalidQuery, http.StatusBadRequest, msgInvalidQuery, - authErrMsg(msg, msgInvalidQuery, withAuth), + authErrMsg(c, msg, msgInvalidQuery), err) } -func authErrMsg(fullMsg, shortMsg string, withAuth bool) string { - if withAuth { +func authErrMsg(c *request.Context, fullMsg, shortMsg string) string { + if !c.AuthResult.Anonymous { return fullMsg } return shortMsg diff --git a/beater/api/config/agent/handler_test.go b/beater/api/config/agent/handler_test.go index 4e4c119b7c7..7dcbc1dc079 100644 --- a/beater/api/config/agent/handler_test.go +++ b/beater/api/config/agent/handler_test.go @@ -38,6 +38,7 @@ import ( libkibana "github.com/elastic/beats/v7/libbeat/kibana" "github.com/elastic/apm-server/agentcfg" + "github.com/elastic/apm-server/beater/authorization" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/beater/headers" "github.com/elastic/apm-server/beater/request" @@ -60,7 +61,6 @@ var ( queryParams map[string]string method string respStatus int - respBodyToken map[string]string respBody map[string]string respEtagHeader, respCacheControlHeader string }{ @@ -99,7 +99,6 @@ var ( respEtagHeader: `"` + mockEtag + `"`, respCacheControlHeader: "max-age=4, must-revalidate", respBody: successBody, - respBodyToken: successBody, }, "NoConfigFound": { @@ -110,7 +109,6 @@ var ( respCacheControlHeader: "max-age=4, must-revalidate", respEtagHeader: fmt.Sprintf("\"%s\"", agentcfg.EtagSentinel), respBody: emptyBody, - respBodyToken: emptyBody, }, "SendToKibanaFailed": { @@ -119,8 +117,7 @@ var ( queryParams: map[string]string{"service.name": "opbeans-ruby"}, respStatus: http.StatusServiceUnavailable, respCacheControlHeader: "max-age=300, must-revalidate", - respBody: map[string]string{"error": agentcfg.ErrMsgSendToKibanaFailed}, - respBodyToken: map[string]string{"error": fmt.Sprintf("%s: testerror", agentcfg.ErrMsgSendToKibanaFailed)}, + respBody: map[string]string{"error": fmt.Sprintf("%s: testerror", agentcfg.ErrMsgSendToKibanaFailed)}, }, "NoConnection": { @@ -130,7 +127,6 @@ var ( respStatus: http.StatusServiceUnavailable, respCacheControlHeader: "max-age=300, must-revalidate", respBody: map[string]string{"error": agentcfg.ErrMsgNoKibanaConnection}, - respBodyToken: map[string]string{"error": agentcfg.ErrMsgNoKibanaConnection}, }, "InvalidVersion": { @@ -140,8 +136,7 @@ var ( queryParams: map[string]string{"service.name": "opbeans-node"}, respStatus: http.StatusServiceUnavailable, respCacheControlHeader: "max-age=300, must-revalidate", - respBody: map[string]string{"error": agentcfg.ErrMsgKibanaVersionNotCompatible}, - respBodyToken: map[string]string{"error": fmt.Sprintf("%s: min version 7.5.0, "+ + respBody: map[string]string{"error": fmt.Sprintf("%s: min version 7.5.0, "+ "configured version 7.2.0", agentcfg.ErrMsgKibanaVersionNotCompatible)}, }, @@ -149,8 +144,7 @@ var ( kbClient: kibanatest.MockKibana(http.StatusOK, m{}, mockVersion, true), method: http.MethodGet, respStatus: http.StatusBadRequest, - respBody: map[string]string{"error": msgInvalidQuery}, - respBodyToken: map[string]string{"error": "service.name is required"}, + respBody: map[string]string{"error": "service.name is required"}, respCacheControlHeader: "max-age=300, must-revalidate", }, @@ -159,8 +153,7 @@ var ( method: http.MethodPut, respStatus: http.StatusMethodNotAllowed, respCacheControlHeader: "max-age=300, must-revalidate", - respBody: map[string]string{"error": msgMethodUnsupported}, - respBodyToken: map[string]string{"error": fmt.Sprintf("%s: PUT", msgMethodUnsupported)}, + respBody: map[string]string{"error": fmt.Sprintf("%s: PUT", msgMethodUnsupported)}, }, "Unauthorized": { @@ -169,8 +162,7 @@ var ( queryParams: map[string]string{"service.name": "opbeans-node"}, respStatus: http.StatusServiceUnavailable, respCacheControlHeader: "max-age=300, must-revalidate", - respBody: map[string]string{"error": agentcfg.ErrUnauthorized}, - respBodyToken: map[string]string{"error": "APM Server is not authorized to query Kibana. " + + respBody: map[string]string{"error": "APM Server is not authorized to query Kibana. " + "Please configure apm-server.kibana.username and apm-server.kibana.password, " + "and ensure the user has the necessary privileges."}, }, @@ -179,40 +171,81 @@ var ( func TestAgentConfigHandler(t *testing.T) { var cfg = config.KibanaAgentConfig{Cache: config.Cache{Expiration: 4 * time.Second}} - - for name, tc := range testcases { - - runTest := func(t *testing.T, expectedBody map[string]string, authorized bool) { - f := agentcfg.NewKibanaFetcher(tc.kbClient, cfg.Cache.Expiration) - h := NewHandler(f, cfg, "") - r := httptest.NewRequest(tc.method, target(tc.queryParams), nil) - for k, v := range tc.requestHeader { - r.Header.Set(k, v) - } - ctx, w := newRequestContext(r) - ctx.AuthResult.Authorized = authorized - h(ctx) - - require.Equal(t, tc.respStatus, w.Code) - require.Equal(t, tc.respCacheControlHeader, w.Header().Get(headers.CacheControl)) - require.Equal(t, tc.respEtagHeader, w.Header().Get(headers.Etag)) - b, err := ioutil.ReadAll(w.Body) - require.NoError(t, err) - var actualBody map[string]string - json.Unmarshal(b, &actualBody) - assert.Equal(t, expectedBody, actualBody) + for _, tc := range testcases { + f := agentcfg.NewKibanaFetcher(tc.kbClient, cfg.Cache.Expiration) + h := NewHandler(f, cfg, "") + r := httptest.NewRequest(tc.method, target(tc.queryParams), nil) + for k, v := range tc.requestHeader { + r.Header.Set(k, v) } + ctx, w := newRequestContext(r) + ctx.AuthResult.Authorized = true + ctx.Request = withAuthorization(ctx.Request, + authorizedForFunc(func(context.Context, authorization.Resource) (authorization.Result, error) { + return authorization.Result{Authorized: true}, nil + }), + ) + h(ctx) + + require.Equal(t, tc.respStatus, w.Code) + require.Equal(t, tc.respCacheControlHeader, w.Header().Get(headers.CacheControl)) + require.Equal(t, tc.respEtagHeader, w.Header().Get(headers.Etag)) + b, err := ioutil.ReadAll(w.Body) + require.NoError(t, err) + var actualBody map[string]string + json.Unmarshal(b, &actualBody) + assert.Equal(t, tc.respBody, actualBody) + } +} - t.Run(name+"NoSecretToken", func(t *testing.T) { - runTest(t, tc.respBody, false) - }) +func TestAgentConfigHandlerAnonymousAccess(t *testing.T) { + kbClient := kibanatest.MockKibana(http.StatusUnauthorized, m{"error": "Unauthorized"}, mockVersion, true) + cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}} + f := agentcfg.NewKibanaFetcher(kbClient, cfg.Cache.Expiration) + h := NewHandler(f, cfg, "") - t.Run(name+"WithSecretToken", func(t *testing.T) { - runTest(t, tc.respBodyToken, true) - }) + for _, tc := range []struct { + anonymous bool + response string + }{{ + anonymous: false, + response: `{"error":"APM Server is not authorized to query Kibana. Please configure apm-server.kibana.username and apm-server.kibana.password, and ensure the user has the necessary privileges."}`, + }, { + anonymous: true, + response: `{"error":"Unauthorized"}`, + }} { + r := httptest.NewRequest(http.MethodGet, target(map[string]string{"service.name": "opbeans"}), nil) + ctx, w := newRequestContext(r) + ctx.AuthResult.Authorized = true + ctx.AuthResult.Anonymous = tc.anonymous + ctx.Request = withAuthorization(ctx.Request, authorization.AnonymousAuth{}) + h(ctx) + assert.Equal(t, tc.response+"\n", w.Body.String()) } } +func TestAgentConfigHandlerAuthorizedForService(t *testing.T) { + cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}} + f := agentcfg.NewKibanaFetcher(nil, cfg.Cache.Expiration) + h := NewHandler(f, cfg, "") + + r := httptest.NewRequest(http.MethodGet, target(map[string]string{"service.name": "opbeans"}), nil) + ctx, w := newRequestContext(r) + ctx.AuthResult.Authorized = true + + var queriedResource authorization.Resource + ctx.Request = withAuthorization(ctx.Request, + authorizedForFunc(func(ctx context.Context, resource authorization.Resource) (authorization.Result, error) { + queriedResource = resource + return authorization.Result{Authorized: false}, nil + }), + ) + h(ctx) + + assert.Equal(t, http.StatusUnauthorized, w.Code, w.Body.String()) + assert.Equal(t, authorization.Resource{ServiceName: "opbeans"}, queriedResource) +} + func TestAgentConfigHandler_NoKibanaClient(t *testing.T) { cfg := config.KibanaAgentConfig{Cache: config.Cache{Expiration: time.Nanosecond}} f := agentcfg.NewKibanaFetcher(nil, cfg.Cache.Expiration) @@ -274,6 +307,7 @@ func TestAgentConfigRum(t *testing.T) { "service": m{"name": "opbeans"}})) ctx, w := newRequestContext(r) ctx.IsRum = true + ctx.AuthResult.Anonymous = true h(ctx) var actual map[string]string json.Unmarshal(w.Body.Bytes(), &actual) @@ -287,6 +321,7 @@ func TestAgentConfigRumEtag(t *testing.T) { r := httptest.NewRequest(http.MethodGet, "/rum?ifnonematch=123&service.name=opbeans", nil) ctx, w := newRequestContext(r) ctx.IsRum = true + ctx.AuthResult.Anonymous = true h(ctx) assert.Equal(t, http.StatusNotModified, w.Code, w.Body.String()) } @@ -296,6 +331,11 @@ func TestAgentConfigNotRum(t *testing.T) { r := httptest.NewRequest(http.MethodPost, "/backend", convert.ToReader(m{ "service": m{"name": "opbeans"}})) ctx, w := newRequestContext(r) + ctx.Request = withAuthorization(ctx.Request, + authorizedForFunc(func(context.Context, authorization.Resource) (authorization.Result, error) { + return authorization.Result{Authorized: true}, nil + }), + ) h(ctx) var actual map[string]string json.Unmarshal(w.Body.Bytes(), &actual) @@ -309,6 +349,7 @@ func TestAgentConfigNoLeak(t *testing.T) { "service": m{"name": "opbeans"}})) ctx, w := newRequestContext(r) ctx.IsRum = true + ctx.AuthResult.Anonymous = true h(ctx) var actual map[string]string json.Unmarshal(w.Body.Bytes(), &actual) @@ -323,6 +364,7 @@ func TestAgentConfigRateLimit(t *testing.T) { ctx, w := newRequestContext(r) ctx.IsRum = true ctx.RateLimiter = rate.NewLimiter(rate.Limit(0), 0) + ctx.AuthResult.Anonymous = true h(ctx) var actual map[string]string json.Unmarshal(w.Body.Bytes(), &actual) @@ -384,6 +426,11 @@ func TestAgentConfigTraceContext(t *testing.T) { func sendRequest(h request.Handler, r *http.Request) *httptest.ResponseRecorder { ctx, recorder := newRequestContext(r) + ctx.Request = withAuthorization(ctx.Request, + authorizedForFunc(func(context.Context, authorization.Resource) (authorization.Result, error) { + return authorization.Result{Authorized: true}, nil + }), + ) h(ctx) return recorder } @@ -423,3 +470,13 @@ func (c *recordingKibanaClient) Send(ctx context.Context, method string, path st c.requests = append(c.requests, req.WithContext(ctx)) return c.Client.Send(ctx, method, path, params, header, body) } + +func withAuthorization(req *http.Request, auth authorization.Authorization) *http.Request { + return req.WithContext(authorization.ContextWithAuthorization(req.Context(), auth)) +} + +type authorizedForFunc func(context.Context, authorization.Resource) (authorization.Result, error) + +func (f authorizedForFunc) AuthorizedFor(ctx context.Context, resource authorization.Resource) (authorization.Result, error) { + return f(ctx, resource) +} diff --git a/beater/api/mux.go b/beater/api/mux.go index 2ee9ba6ae47..43300381b4d 100644 --- a/beater/api/mux.go +++ b/beater/api/mux.go @@ -239,6 +239,7 @@ func rumMiddleware(cfg *config.Config, _ *authorization.Handler, m map[request.R middleware.SetRumFlagMiddleware(), middleware.SetIPRateLimitMiddleware(cfg.RumConfig.EventRate), middleware.CORSMiddleware(cfg.RumConfig.AllowOrigins, cfg.RumConfig.AllowHeaders), + middleware.AnonymousAuthorizationMiddleware(), middleware.KillSwitchMiddleware(cfg.RumConfig.Enabled, msg), ) if cfg.AugmentEnabled { diff --git a/beater/api/root/handler.go b/beater/api/root/handler.go index 13178a37504..85101e8e4a3 100644 --- a/beater/api/root/handler.go +++ b/beater/api/root/handler.go @@ -56,7 +56,7 @@ func Handler(cfg HandlerConfig) request.Handler { return } c.Result.SetDefault(request.IDResponseValidOK) - if c.AuthResult.Authorized { + if c.AuthResult.Authorized && !c.AuthResult.Anonymous { c.Result.Body = serverInfo } c.Write() diff --git a/beater/api/root/handler_test.go b/beater/api/root/handler_test.go index c16fa5b7524..43b51dd9efa 100644 --- a/beater/api/root/handler_test.go +++ b/beater/api/root/handler_test.go @@ -65,4 +65,14 @@ func TestRootHandler(t *testing.T) { version.Commit()) assert.Equal(t, body, w.Body.String()) }) + + t.Run("authorized_anonymous", func(t *testing.T) { + c, w := beatertest.ContextWithResponseRecorder(http.MethodGet, "/") + c.AuthResult.Authorized = true + c.AuthResult.Anonymous = true + Handler(HandlerConfig{Version: "1.2.3"})(c) + + assert.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, "", w.Body.String()) + }) } diff --git a/beater/authorization/allow.go b/beater/authorization/allow.go index 76379a23b02..407fc26786f 100644 --- a/beater/authorization/allow.go +++ b/beater/authorization/allow.go @@ -19,14 +19,12 @@ package authorization import ( "context" - - "github.com/elastic/apm-server/elasticsearch" ) -// allowAuth implements the Authorization interface. It allows all authorization requests. +// allowAuth implements the Authorization interface. type allowAuth struct{} -// AuthorizedFor always returns true -func (allowAuth) AuthorizedFor(context.Context, elasticsearch.Resource) (Result, error) { +// AuthorizedFor always returns a Result indicating the request is authorized. +func (allowAuth) AuthorizedFor(context.Context, Resource) (Result, error) { return Result{Authorized: true}, nil } diff --git a/beater/authorization/allow_test.go b/beater/authorization/allow_test.go index 8987be31b55..fe1d6098317 100644 --- a/beater/authorization/allow_test.go +++ b/beater/authorization/allow_test.go @@ -27,7 +27,7 @@ import ( func TestAllowAuth(t *testing.T) { handler := allowAuth{} - result, err := handler.AuthorizedFor(context.Background(), "") + result, err := handler.AuthorizedFor(context.Background(), Resource{}) assert.NoError(t, err) assert.Equal(t, Result{Authorized: true}, result) } diff --git a/beater/authorization/anonymous.go b/beater/authorization/anonymous.go new file mode 100644 index 00000000000..90f058453e3 --- /dev/null +++ b/beater/authorization/anonymous.go @@ -0,0 +1,30 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package authorization + +import ( + "context" +) + +// AnonymousAuth implements the Authorization interface. +type AnonymousAuth struct{} + +// AuthorizedFor always returns a Result indicating the request is authorized and anonymous. +func (AnonymousAuth) AuthorizedFor(context.Context, Resource) (Result, error) { + return Result{Anonymous: true, Authorized: true}, nil +} diff --git a/beater/authorization/apikey.go b/beater/authorization/apikey.go index ffeca3513a6..8bb430fa305 100644 --- a/beater/authorization/apikey.go +++ b/beater/authorization/apikey.go @@ -32,10 +32,10 @@ const ( // Application is a constant mapped to the "application" field for the Elasticsearch security API // This identifies privileges and keys created for APM Application es.AppName = "apm" + // ResourceInternal is only valid for first authorization of a request. // The API Key needs to grant privileges to additional resources for successful processing of requests. ResourceInternal = es.Resource("-") - ResourceAny = es.Resource("*") ) type apikeyBuilder struct { @@ -59,10 +59,19 @@ func (a *apikeyBuilder) forKey(key string) *apikeyAuth { } // AuthorizedFor checks if the configured api key is authorized. -// An api key is considered to be authorized when the api key has the configured privileges for the requested resource. -// Permissions are fetched from Elasticsearch and then cached in a global cache. -func (a *apikeyAuth) AuthorizedFor(ctx context.Context, resource es.Resource) (Result, error) { - privileges := a.cache.get(id(a.key, resource)) +// +// An API Key is considered to be authorized when the API Key has the configured privileges +// for the requested resource. Permissions are fetched from Elasticsearch and then cached in +// a global cache. +func (a *apikeyAuth) AuthorizedFor(ctx context.Context, _ Resource) (Result, error) { + // TODO if resource is non-zero, map to different application resources in the privilege queries. + // + // For now, having any valid "apm" application API Key grants access to any agent and service. + // In the future, it should be possible to have API Keys that can be restricted to a set of agent + // and service names. + esResource := ResourceInternal + + privileges := a.cache.get(id(a.key, esResource)) if privileges != nil { return Result{Authorized: a.allowed(privileges)}, nil } @@ -73,11 +82,11 @@ func (a *apikeyAuth) AuthorizedFor(ctx context.Context, resource es.Resource) (R "or consider increasing config option `apm-server.api_key.limit`") } - privileges, err := a.queryES(ctx, resource) + privileges, err := a.queryES(ctx, esResource) if err != nil { return Result{}, err } - a.cache.add(id(a.key, resource), privileges) + a.cache.add(id(a.key, esResource), privileges) return Result{Authorized: a.allowed(privileges)}, nil } @@ -96,15 +105,13 @@ func (a *apikeyAuth) allowed(permissions es.Permissions) bool { func (a *apikeyAuth) queryES(ctx context.Context, resource es.Resource) (es.Permissions, error) { request := es.HasPrivilegesRequest{ - Applications: []es.Application{ - { - Name: Application, - // it is important to query all privilege actions because they are cached by api key+resources - // querying a.anyOfPrivileges would result in an incomplete cache entry - Privileges: ActionsAll(), - Resources: []es.Resource{resource}, - }, - }, + Applications: []es.Application{{ + Name: Application, + // it is important to query all privilege actions because they are cached by api key+resources + // querying a.anyOfPrivileges would result in an incomplete cache entry + Privileges: ActionsAll(), + Resources: []es.Resource{resource}, + }}, } info, err := es.HasPrivileges(ctx, a.esClient, request, a.key) if err != nil { diff --git a/beater/authorization/apikey_test.go b/beater/authorization/apikey_test.go index d95dc012d9e..20d6e8d193c 100644 --- a/beater/authorization/apikey_test.go +++ b/beater/authorization/apikey_test.go @@ -27,28 +27,27 @@ import ( "github.com/stretchr/testify/require" "go.elastic.co/apm/apmtest" + "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/elasticsearch" "github.com/elastic/apm-server/elasticsearch/estest" ) func TestApikeyBuilder(t *testing.T) { // in case handler does not read from cache, but from ES an error is returned - tc := &apikeyTestcase{ - cache: newPrivilegesCache(time.Minute, 5), - transport: estest.NewTransport(t, http.StatusInternalServerError, nil)} + tc := &apikeyTestcase{transport: estest.NewTransport(t, http.StatusInternalServerError, nil)} tc.setup(t) key := "myApiKey" - handler1 := tc.builder.forKey(key) - handler2 := tc.builder.forKey(key) + handler1 := tc.builder.ForPrivilege(PrivilegeEventWrite.Action).AuthorizationFor("ApiKey", key) + handler2 := tc.builder.ForPrivilege(PrivilegeEventWrite.Action).AuthorizationFor("ApiKey", key) // add existing privileges to shared cache privilegesValid := elasticsearch.Permissions{} for _, p := range PrivilegesAll { privilegesValid[p.Action] = true } - resource := elasticsearch.Resource("service-go") - tc.cache.add(id(key, resource), privilegesValid) + resource := Resource{} + tc.cache.add(id(key, "-"), privilegesValid) // check that cache is actually shared between apiKeyHandlers result, err := handler1.AuthorizedFor(context.Background(), resource) @@ -62,41 +61,46 @@ func TestApikeyBuilder(t *testing.T) { func TestAPIKey_AuthorizedFor(t *testing.T) { t.Run("cache full", func(t *testing.T) { - tc := &apikeyTestcase{cache: newPrivilegesCache(time.Millisecond, 1)} + tc := &apikeyTestcase{apiKeyLimit: 1} tc.setup(t) - handler := tc.builder.forKey("") - result, err := handler.AuthorizedFor(context.Background(), "data:ingest") - assert.Equal(t, Result{Authorized: false}, result) - assert.NoError(t, err) + handler1 := tc.builder.ForPrivilege(PrivilegeEventWrite.Action).AuthorizationFor("ApiKey", "foo") + for i := 0; i < 2; i++ { + result, err := handler1.AuthorizedFor(context.Background(), Resource{}) + assert.Equal(t, Result{Authorized: true}, result) + assert.NoError(t, err) + } - result, err = handler.AuthorizedFor(context.Background(), "apm:read") - assert.Error(t, err) + handler2 := tc.builder.ForPrivilege(PrivilegeEventWrite.Action).AuthorizationFor("ApiKey", "bar") + result, err := handler2.AuthorizedFor(context.Background(), Resource{}) assert.Equal(t, Result{Authorized: false}, result) + assert.Error(t, err) // cache is full }) t.Run("from cache", func(t *testing.T) { // in case handler does not read from cache, but from ES an error is returned tc := &apikeyTestcase{transport: estest.NewTransport(t, http.StatusInternalServerError, nil)} tc.setup(t) - key := "" - handler := tc.builder.forKey(key) - resourceValid := elasticsearch.Resource("foo") - resourceInvalid := elasticsearch.Resource("bar") - resourceMissing := elasticsearch.Resource("missing") - tc.cache.add(id(key, resourceValid), elasticsearch.Permissions{tc.anyOfPrivileges[0]: true}) - tc.cache.add(id(key, resourceInvalid), elasticsearch.Permissions{tc.anyOfPrivileges[0]: false}) + keyValid := "foo" + keyInvalid := "bar" + keyMissing := "missing" + tc.cache.add(id(keyValid, "-"), elasticsearch.Permissions{PrivilegeEventWrite.Action: true}) + tc.cache.add(id(keyInvalid, "-"), elasticsearch.Permissions{PrivilegeEventWrite.Action: false}) + + handlerValid := tc.builder.ForPrivilege(PrivilegeEventWrite.Action).AuthorizationFor("ApiKey", keyValid) + handlerInvalid := tc.builder.ForPrivilege(PrivilegeEventWrite.Action).AuthorizationFor("ApiKey", keyInvalid) + handlerMissing := tc.builder.ForPrivilege(PrivilegeEventWrite.Action).AuthorizationFor("ApiKey", keyMissing) - result, err := handler.AuthorizedFor(context.Background(), resourceValid) + result, err := handlerValid.AuthorizedFor(context.Background(), Resource{}) require.NoError(t, err) assert.Equal(t, Result{Authorized: true}, result) - result, err = handler.AuthorizedFor(context.Background(), resourceInvalid) + result, err = handlerInvalid.AuthorizedFor(context.Background(), Resource{}) require.NoError(t, err) assert.Equal(t, Result{Authorized: false}, result) - result, err = handler.AuthorizedFor(context.Background(), resourceMissing) + result, err = handlerMissing.AuthorizedFor(context.Background(), Resource{}) require.Error(t, err) assert.Equal(t, Result{Authorized: false}, result) }) @@ -104,29 +108,30 @@ func TestAPIKey_AuthorizedFor(t *testing.T) { t.Run("from ES", func(t *testing.T) { tc := &apikeyTestcase{} tc.setup(t) - handler := tc.builder.forKey("key") - result, err := handler.AuthorizedFor(context.Background(), "foo") + handler := tc.builder.ForPrivilege(PrivilegeEventWrite.Action).AuthorizationFor("ApiKey", "key1") + result, err := handler.AuthorizedFor(context.Background(), Resource{}) require.NoError(t, err) assert.Equal(t, Result{Authorized: true}, result) - result, err = handler.AuthorizedFor(context.Background(), "bar") + handler = tc.builder.ForPrivilege(PrivilegeSourcemapWrite.Action).AuthorizationFor("ApiKey", "key2") + result, err = handler.AuthorizedFor(context.Background(), Resource{}) require.NoError(t, err) assert.Equal(t, Result{Authorized: false}, result) - result, err = handler.AuthorizedFor(context.Background(), "missing") + handler = tc.builder.ForPrivilege("missing").AuthorizationFor("ApiKey", "key3") + result, err = handler.AuthorizedFor(context.Background(), Resource{}) require.NoError(t, err) assert.Equal(t, Result{Authorized: false}, result) assert.Equal(t, 3, tc.cache.cache.ItemCount()) }) t.Run("client error", func(t *testing.T) { - tc := &apikeyTestcase{ - transport: estest.NewTransport(t, -1, nil)} + tc := &apikeyTestcase{transport: estest.NewTransport(t, -1, nil)} tc.setup(t) - handler := tc.builder.forKey("12a3") + handler := tc.builder.ForPrivilege(PrivilegeEventWrite.Action).AuthorizationFor("ApiKey", "12a3") - result, err := handler.AuthorizedFor(context.Background(), "xyz") + result, err := handler.AuthorizedFor(context.Background(), Resource{}) require.Error(t, err) assert.Contains(t, err.Error(), "client error") assert.Equal(t, Result{Authorized: false}, result) @@ -136,9 +141,9 @@ func TestAPIKey_AuthorizedFor(t *testing.T) { t.Run("unauthorized status from ES", func(t *testing.T) { tc := &apikeyTestcase{transport: estest.NewTransport(t, http.StatusUnauthorized, nil)} tc.setup(t) - handler := tc.builder.forKey("12a3") + handler := tc.builder.ForPrivilege(PrivilegeEventWrite.Action).AuthorizationFor("ApiKey", "12a3") - result, err := handler.AuthorizedFor(context.Background(), "xyz") + result, err := handler.AuthorizedFor(context.Background(), Resource{}) require.NoError(t, err) assert.Equal(t, Result{Authorized: false}, result) assert.Equal(t, 1, tc.cache.cache.ItemCount()) // unauthorized responses are cached @@ -147,9 +152,9 @@ func TestAPIKey_AuthorizedFor(t *testing.T) { t.Run("invalid status from ES", func(t *testing.T) { tc := &apikeyTestcase{transport: estest.NewTransport(t, http.StatusNotFound, nil)} tc.setup(t) - handler := tc.builder.forKey("12a3") + handler := tc.builder.ForPrivilege(PrivilegeEventWrite.Action).AuthorizationFor("ApiKey", "12a3") - result, err := handler.AuthorizedFor(context.Background(), "xyz") + result, err := handler.AuthorizedFor(context.Background(), Resource{}) require.Error(t, err) assert.Equal(t, Result{Authorized: false}, result) assert.Equal(t, 0, tc.cache.cache.ItemCount()) @@ -158,8 +163,8 @@ func TestAPIKey_AuthorizedFor(t *testing.T) { t.Run("decode error from ES", func(t *testing.T) { tc := &apikeyTestcase{transport: estest.NewTransport(t, http.StatusOK, nil)} tc.setup(t) - handler := tc.builder.forKey("123") - result, err := handler.AuthorizedFor(context.Background(), "foo") + handler := tc.builder.ForPrivilege(PrivilegeEventWrite.Action).AuthorizationFor("ApiKey", "123") + result, err := handler.AuthorizedFor(context.Background(), Resource{}) require.Error(t, err) assert.Equal(t, Result{Authorized: false}, result) assert.Zero(t, tc.cache.cache.ItemCount()) @@ -167,12 +172,12 @@ func TestAPIKey_AuthorizedFor(t *testing.T) { } type apikeyTestcase struct { - transport *estest.Transport - client elasticsearch.Client - cache *privilegesCache - anyOfPrivileges []elasticsearch.PrivilegeAction + transport *estest.Transport + client elasticsearch.Client + cache *privilegesCache + apiKeyLimit int - builder *apikeyBuilder + builder *Builder } func (tc *apikeyTestcase) setup(t *testing.T) { @@ -182,20 +187,24 @@ func (tc *apikeyTestcase) setup(t *testing.T) { tc.transport = estest.NewTransport(t, http.StatusOK, map[string]interface{}{ "application": map[string]interface{}{ "apm": map[string]map[string]interface{}{ - "foo": {"config_agent:read": true, "event:write": true, "sourcemap:write": false}, - "bar": {"config_agent:read": true, "event:write": false}, - }}}) + "-": {"config_agent:read": true, "event:write": true, "sourcemap:write": false}, + }, + }, + }) } tc.client, err = estest.NewElasticsearchClient(tc.transport) require.NoError(t, err) } - if tc.cache == nil { - tc.cache = newPrivilegesCache(time.Minute, 5) - } - if tc.anyOfPrivileges == nil { - tc.anyOfPrivileges = []elasticsearch.PrivilegeAction{PrivilegeEventWrite.Action, PrivilegeSourcemapWrite.Action} + + cfg := config.DefaultConfig() + cfg.APIKeyConfig.Enabled = true + if tc.apiKeyLimit > 0 { + cfg.APIKeyConfig.LimitPerMin = tc.apiKeyLimit } - tc.builder = newApikeyBuilder(tc.client, tc.cache, tc.anyOfPrivileges) + tc.builder, err = NewBuilder(cfg) + require.NoError(t, err) + tc.builder.apikey.esClient = tc.client + tc.cache = tc.builder.apikey.cache } func TestApikeyBuilderTraceContext(t *testing.T) { @@ -212,8 +221,8 @@ func TestApikeyBuilderTraceContext(t *testing.T) { // When AuthorizedFor is called with a context containing // a transaction, the underlying Elasticsearch query should // create a span. - handler.AuthorizedFor(ctx, ResourceInternal) - handler.AuthorizedFor(ctx, ResourceInternal) // cached, no query + handler.AuthorizedFor(ctx, Resource{}) + handler.AuthorizedFor(ctx, Resource{}) // cached, no query }) require.Len(t, spans, 1) assert.Equal(t, "elasticsearch", spans[0].Subtype) diff --git a/beater/authorization/bearer.go b/beater/authorization/bearer.go index 0c49644e203..6ae9425c7f3 100644 --- a/beater/authorization/bearer.go +++ b/beater/authorization/bearer.go @@ -20,8 +20,6 @@ package authorization import ( "context" "crypto/subtle" - - "github.com/elastic/apm-server/elasticsearch" ) type bearerBuilder struct { @@ -38,6 +36,6 @@ func (b bearerBuilder) forToken(token string) *bearerAuth { } } -func (b *bearerAuth) AuthorizedFor(context.Context, elasticsearch.Resource) (Result, error) { +func (b *bearerAuth) AuthorizedFor(context.Context, Resource) (Result, error) { return Result{Authorized: b.authorized}, nil } diff --git a/beater/authorization/bearer_test.go b/beater/authorization/bearer_test.go index 8130b278dd5..3140f7c7319 100644 --- a/beater/authorization/bearer_test.go +++ b/beater/authorization/bearer_test.go @@ -38,7 +38,7 @@ func TestBearerAuth(t *testing.T) { } { t.Run(name, func(t *testing.T) { bearer := tc.builder.forToken(tc.token) - result, err := bearer.AuthorizedFor(context.Background(), "") + result, err := bearer.AuthorizedFor(context.Background(), Resource{}) assert.NoError(t, err) assert.Equal(t, Result{Authorized: tc.authorized}, result) }) diff --git a/beater/authorization/builder.go b/beater/authorization/builder.go index bc3ea9956f9..cb316bbb9d1 100644 --- a/beater/authorization/builder.go +++ b/beater/authorization/builder.go @@ -38,13 +38,41 @@ type Handler Builder // Authorization interface to be implemented by different auth types type Authorization interface { - AuthorizedFor(context.Context, elasticsearch.Resource) (Result, error) + // AuthorizedFor reports whether the agent is authorized for access to + // the given resource. + // + // When resource is zero, AuthorizedFor indicates whether the agent is + // allowed any access at all. When resource is non-zero, AllowedFor + // indicates whether the agent has access to the specific resource. + AuthorizedFor(context.Context, Resource) (Result, error) +} + +// Resource holds parameters for restricting access that may be checked by +// Authorization.AuthorizedFor. +type Resource struct { + // AgentName holds the agent name associated with the agent making the + // request. This may be empty if the agent is unknown or irrelevant, + // such as in a request to the healthcheck endpoint. + AgentName string + + // ServiceName holds the service name associated with the agent making + // the request. This may be empty if the agent is unknown or irrelevant, + // such as in a request to the healthcheck endpoint. + ServiceName string } // Result holds a result of calling Authorization.AuthorizedFor. type Result struct { - // Authorized indicates whether or not the authorization - // attempt was successful. + // Anonymous indicates whether or not the client has been granted anonymous access. + // + // Anonymous may be be false when no authentication/authorization is required, + // even if the client has not presented any credentials. + Anonymous bool + + // Authorized indicates whether or not the authorization attempt was successful. + // + // It is possible that a result is both Anonymous and Authorized, when limited + // anonymous access is permitted (e.g. for RUM). Authorized bool // Reason holds an optional reason for unauthorized results. diff --git a/beater/authorization/context.go b/beater/authorization/context.go new file mode 100644 index 00000000000..bbc7c745115 --- /dev/null +++ b/beater/authorization/context.go @@ -0,0 +1,53 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package authorization + +import ( + "context" + "errors" +) + +// ErrNoAuthorization is returned from AuthorizedFor when the context does not +// contain an Authorization. +var ErrNoAuthorization = errors.New("no authorization.Authorization in context") + +type authorizationKey struct{} + +// ContextWithAuthorization returns a copy of parent associated with auth. +func ContextWithAuthorization(parent context.Context, auth Authorization) context.Context { + return context.WithValue(parent, authorizationKey{}, auth) +} + +// authorizationFromContext returns the Authorization stored in ctx, if any, +// and a boolean indicating whether there one was found. The boolean is false +// if and only if the authorization is nil. +func authorizationFromContext(ctx context.Context) (Authorization, bool) { + auth, ok := ctx.Value(authorizationKey{}).(Authorization) + return auth, ok +} + +// AuthorizedFor is a shortcut for obtaining a Handler from ctx using +// HandlerFromContext, and calling its AuthorizedFor method. AuthorizedFor +// returns an error if ctx does not contain a Handler. +func AuthorizedFor(ctx context.Context, resource Resource) (Result, error) { + auth, ok := authorizationFromContext(ctx) + if !ok { + return Result{}, ErrNoAuthorization + } + return auth.AuthorizedFor(ctx, resource) +} diff --git a/beater/authorization/deny.go b/beater/authorization/deny.go index 3d59f4b1c81..683c69aa995 100644 --- a/beater/authorization/deny.go +++ b/beater/authorization/deny.go @@ -19,16 +19,15 @@ package authorization import ( "context" - - "github.com/elastic/apm-server/elasticsearch" ) -// denyAuth implements the Authorization interface. It denies all authorization requests. +// denyAuth implements the Authorization interface. type denyAuth struct { reason string } -// AuthorizedFor always returns false -func (d denyAuth) AuthorizedFor(context.Context, elasticsearch.Resource) (Result, error) { +// AuthorizedFor always returns a Result indicating the request is neither authorized +// nor authenticated. +func (d denyAuth) AuthorizedFor(context.Context, Resource) (Result, error) { return Result{Authorized: false, Reason: d.reason}, nil } diff --git a/beater/authorization/deny_test.go b/beater/authorization/deny_test.go index 37af23c419b..29adf0e28c3 100644 --- a/beater/authorization/deny_test.go +++ b/beater/authorization/deny_test.go @@ -27,7 +27,7 @@ import ( func TestDenyAuth(t *testing.T) { handler := denyAuth{} - result, err := handler.AuthorizedFor(context.Background(), "") + result, err := handler.AuthorizedFor(context.Background(), Resource{}) assert.NoError(t, err) assert.Equal(t, Result{Authorized: false}, result) } diff --git a/beater/authprocessor.go b/beater/authprocessor.go new file mode 100644 index 00000000000..ca0090168e6 --- /dev/null +++ b/beater/authprocessor.go @@ -0,0 +1,43 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "context" + "errors" + + "github.com/elastic/apm-server/beater/authorization" + "github.com/elastic/apm-server/model" +) + +// verifyAuthorizedFor is a model.BatchProcessor that checks authorization +// for the agent and service name in the metadata. +func verifyAuthorizedFor(ctx context.Context, meta *model.Metadata) error { + result, err := authorization.AuthorizedFor(ctx, authorization.Resource{ + AgentName: meta.Service.Agent.Name, + ServiceName: meta.Service.Name, + }) + if err != nil { + return err + } + if result.Authorized { + return nil + } + // TODO(axw) specific error type to control response code? + return errors.New(result.Reason) +} diff --git a/beater/grpcauth.go b/beater/grpcauth.go index 02bad821747..cd29bbfd6c4 100644 --- a/beater/grpcauth.go +++ b/beater/grpcauth.go @@ -45,15 +45,17 @@ func newAuthUnaryServerInterceptor(builder *authorization.Builder) grpc.UnarySer handler grpc.UnaryHandler, ) (resp interface{}, err error) { if strings.HasPrefix(info.FullMethod, "/opentelemetry") { - if err := verifyGRPCAuthorization(ctx, authHandler); err != nil { + auth, err := verifyGRPCAuthorization(ctx, authHandler) + if err != nil { return nil, err } + ctx = authorization.ContextWithAuthorization(ctx, auth) } return handler(ctx, req) } } -func verifyGRPCAuthorization(ctx context.Context, authHandler *authorization.Handler) error { +func verifyGRPCAuthorization(ctx context.Context, authHandler *authorization.Handler) (authorization.Authorization, error) { var authHeader string if md, ok := metadata.FromIncomingContext(ctx); ok { if values := md.Get(headers.Authorization); len(values) > 0 { @@ -61,16 +63,16 @@ func verifyGRPCAuthorization(ctx context.Context, authHandler *authorization.Han } } auth := authHandler.AuthorizationFor(authorization.ParseAuthorizationHeader(authHeader)) - result, err := auth.AuthorizedFor(ctx, authorization.ResourceInternal) + result, err := auth.AuthorizedFor(ctx, authorization.Resource{}) if err != nil { - return err + return nil, err } if !result.Authorized { message := "unauthorized" if result.Reason != "" { message = result.Reason } - return status.Error(codes.Unauthenticated, message) + return nil, status.Error(codes.Unauthenticated, message) } - return nil + return auth, nil } diff --git a/beater/http.go b/beater/http.go index de0c26d4a6a..a62e4ef3ee7 100644 --- a/beater/http.go +++ b/beater/http.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/apm-server/beater/api" "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/model" + "github.com/elastic/apm-server/model/modelprocessor" "github.com/elastic/apm-server/publish" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/transport/tlscommon" @@ -47,6 +48,13 @@ type httpServer struct { } func newHTTPServer(logger *logp.Logger, info beat.Info, cfg *config.Config, tracer *apm.Tracer, reporter publish.Reporter, batchProcessor model.BatchProcessor, f agentcfg.Fetcher) (*httpServer, error) { + + // Add a model processor that checks authorization for the agent and service for each event. + batchProcessor = modelprocessor.Chained{ + modelprocessor.MetadataProcessorFunc(verifyAuthorizedFor), + batchProcessor, + } + mux, err := api.NewMux(info, cfg, reporter, batchProcessor, f) if err != nil { return nil, err diff --git a/beater/jaeger/common.go b/beater/jaeger/common.go index 95e0750bc35..69d80df18c3 100644 --- a/beater/jaeger/common.go +++ b/beater/jaeger/common.go @@ -59,14 +59,14 @@ func consumeBatch( return consumer.ConsumeTraces(ctx, traces) } -type authFunc func(context.Context, model.Batch) error +type authFunc func(context.Context, model.Batch) (context.Context, error) -func noAuth(context.Context, model.Batch) error { - return nil +func noAuth(ctx context.Context, _ model.Batch) (context.Context, error) { + return ctx, nil } func makeAuthFunc(authTag string, authHandler *authorization.Handler) authFunc { - return func(ctx context.Context, batch model.Batch) error { + return func(ctx context.Context, batch model.Batch) (context.Context, error) { var kind, token string for i, kv := range batch.Process.GetTags() { if kv.Key != authTag { @@ -78,15 +78,16 @@ func makeAuthFunc(authTag string, authHandler *authorization.Handler) authFunc { break } auth := authHandler.AuthorizationFor(kind, token) - result, err := auth.AuthorizedFor(ctx, authorization.ResourceInternal) + result, err := auth.AuthorizedFor(ctx, authorization.Resource{}) if !result.Authorized { if err != nil { - return errors.Wrap(err, errNotAuthorized.Error()) + return nil, errors.Wrap(err, errNotAuthorized.Error()) } // NOTE(axw) for now at least, we do not return result.Reason in the error message, // as it refers to the "Authorization header" which is incorrect for Jaeger. - return errNotAuthorized + return nil, errNotAuthorized } - return nil + ctx = authorization.ContextWithAuthorization(ctx, auth) + return ctx, nil } } diff --git a/beater/jaeger/grpc.go b/beater/jaeger/grpc.go index 8ed96287843..4738fa0adbc 100644 --- a/beater/jaeger/grpc.go +++ b/beater/jaeger/grpc.go @@ -78,7 +78,8 @@ func (c *grpcCollector) PostSpans(ctx context.Context, r *api_v2.PostSpansReques } func (c *grpcCollector) postSpans(ctx context.Context, batch model.Batch) error { - if err := c.auth(ctx, batch); err != nil { + ctx, err := c.auth(ctx, batch) + if err != nil { gRPCCollectorMonitoringMap.inc(request.IDResponseErrorsUnauthorized) return status.Error(codes.Unauthenticated, err.Error()) } diff --git a/beater/jaeger/grpc_test.go b/beater/jaeger/grpc_test.go index d2d6881ebef..d2de61bf7fd 100644 --- a/beater/jaeger/grpc_test.go +++ b/beater/jaeger/grpc_test.go @@ -42,6 +42,8 @@ import ( "github.com/elastic/apm-server/kibana/kibanatest" ) +type authKey struct{} + func TestGRPCCollector_PostSpans(t *testing.T) { for name, tc := range map[string]testGRPCCollector{ "empty request": { @@ -50,25 +52,32 @@ func TestGRPCCollector_PostSpans(t *testing.T) { "successful request": {}, "failing request": { consumerErr: errors.New("consumer failed"), + expectedErr: errors.New("consumer failed"), }, "auth fails": { - authError: errors.New("oh noes"), + authErr: errors.New("oh noes"), + expectedErr: status.Error(codes.Unauthenticated, "oh noes"), + }, + "auth context": { + auth: func(ctx context.Context, batch model.Batch) (context.Context, error) { + return context.WithValue(ctx, authKey{}, 123), nil + }, + consumer: func(ctx context.Context, td pdata.Traces) error { + if ctx.Value(authKey{}) != 123 { + panic("auth context not propagated to consumer") + } + return nil + }, }, } { t.Run(name, func(t *testing.T) { tc.setup(t) - var expectedErr error - if tc.authError != nil { - expectedErr = status.Error(codes.Unauthenticated, tc.authError.Error()) - } else { - expectedErr = tc.consumerErr - } resp, err := tc.collector.PostSpans(context.Background(), tc.request) - if expectedErr != nil { + if tc.expectedErr != nil { require.Nil(t, resp) require.Error(t, err) - assert.Equal(t, expectedErr, err) + assert.Equal(t, tc.expectedErr, err) } else { require.NotNil(t, resp) require.NoError(t, err) @@ -79,9 +88,13 @@ func TestGRPCCollector_PostSpans(t *testing.T) { type testGRPCCollector struct { request *api_v2.PostSpansRequest - authError error + consumer tracesConsumerFunc + auth authFunc + authErr error consumerErr error collector *grpcCollector + + expectedErr error } func (tc *testGRPCCollector) setup(t *testing.T) { @@ -107,11 +120,17 @@ func (tc *testGRPCCollector) setup(t *testing.T) { tc.request = &api_v2.PostSpansRequest{Batch: *batches[0]} } - tc.collector = &grpcCollector{authFunc(func(context.Context, model.Batch) error { - return tc.authError - }), tracesConsumerFunc(func(ctx context.Context, td pdata.Traces) error { - return tc.consumerErr - })} + if tc.consumer == nil { + tc.consumer = func(ctx context.Context, td pdata.Traces) error { + return tc.consumerErr + } + } + if tc.auth == nil { + tc.auth = func(ctx context.Context, _ model.Batch) (context.Context, error) { + return ctx, tc.authErr + } + } + tc.collector = &grpcCollector{tc.auth, tc.consumer} } type tracesConsumerFunc func(ctx context.Context, td pdata.Traces) error diff --git a/beater/middleware/authorization_middleware.go b/beater/middleware/authorization_middleware.go index 9453e34c255..2de8adb25f1 100644 --- a/beater/middleware/authorization_middleware.go +++ b/beater/middleware/authorization_middleware.go @@ -36,7 +36,7 @@ func AuthorizationMiddleware(auth AuthorizationHandler, required bool) Middlewar header := c.Request.Header.Get(headers.Authorization) auth := auth.AuthorizationFor(authorization.ParseAuthorizationHeader(header)) - result, err := auth.AuthorizedFor(c.Request.Context(), authorization.ResourceInternal) + result, err := auth.AuthorizedFor(c.Request.Context(), authorization.Resource{}) if err != nil { c.Result.SetDefault(request.IDResponseErrorsServiceUnavailable) c.Result.Err = err @@ -53,8 +53,20 @@ func AuthorizationMiddleware(auth AuthorizationHandler, required bool) Middlewar return } c.AuthResult = result + c.Request = c.Request.WithContext(authorization.ContextWithAuthorization(c.Request.Context(), auth)) h(c) }, nil } } + +// AnonymousAuthorizationMiddleware returns a Middleware allowing anonymous access. +func AnonymousAuthorizationMiddleware() Middleware { + return func(h request.Handler) (request.Handler, error) { + return func(c *request.Context) { + auth := authorization.AnonymousAuth{} + c.Request = c.Request.WithContext(authorization.ContextWithAuthorization(c.Request.Context(), auth)) + h(c) + }, nil + } +} diff --git a/beater/middleware/authorization_middleware_test.go b/beater/middleware/authorization_middleware_test.go index 284b7938769..d4e5b2bb42f 100644 --- a/beater/middleware/authorization_middleware_test.go +++ b/beater/middleware/authorization_middleware_test.go @@ -33,7 +33,6 @@ import ( "github.com/elastic/apm-server/beater/config" "github.com/elastic/apm-server/beater/headers" "github.com/elastic/apm-server/beater/request" - "github.com/elastic/apm-server/elasticsearch" ) func TestAuthorizationMiddleware(t *testing.T) { @@ -124,7 +123,7 @@ func TestAuthorizationMiddleware(t *testing.T) { } func TestAuthorizationMiddlewareError(t *testing.T) { - auth := authorizationFunc(func(ctx context.Context, resource elasticsearch.Resource) (authorization.Result, error) { + auth := authorizationFunc(func(ctx context.Context, resource authorization.Resource) (authorization.Result, error) { return authorization.Result{Authorized: true}, errors.New("internal details should not be leaked") }) handler := authorizationHandlerFunc(func(kind, value string) authorization.Authorization { @@ -147,8 +146,8 @@ func (f authorizationHandlerFunc) AuthorizationFor(kind, value string) authoriza return f(kind, value) } -type authorizationFunc func(context.Context, elasticsearch.Resource) (authorization.Result, error) +type authorizationFunc func(context.Context, authorization.Resource) (authorization.Result, error) -func (f authorizationFunc) AuthorizedFor(ctx context.Context, resource elasticsearch.Resource) (authorization.Result, error) { +func (f authorizationFunc) AuthorizedFor(ctx context.Context, resource authorization.Resource) (authorization.Result, error) { return f(ctx, resource) } diff --git a/beater/server.go b/beater/server.go index 9f2789c9a2d..467af756bfd 100644 --- a/beater/server.go +++ b/beater/server.go @@ -169,6 +169,12 @@ func newGRPCServer( } } + // Add a model processor that checks authorization for the agent and service for each event. + batchProcessor = modelprocessor.Chained{ + modelprocessor.MetadataProcessorFunc(verifyAuthorizedFor), + batchProcessor, + } + jaeger.RegisterGRPCServices(srv, authBuilder, jaeger.ElasticAuthTag, logger, batchProcessor, fetcher) if err := otlp.RegisterGRPCServices(srv, batchProcessor); err != nil { return nil, err diff --git a/cmd/apikey.go b/cmd/apikey.go index f12e3cd15ca..b9f1f7bd1c4 100644 --- a/cmd/apikey.go +++ b/cmd/apikey.go @@ -315,7 +315,7 @@ PUT /_security/role/my_role { { Name: auth.Application, Privileges: privileges, - Resources: []es.Resource{auth.ResourceAny}, + Resources: []es.Resource{"*"}, }, }, }, @@ -429,7 +429,7 @@ func verifyAPIKey(config *config.Config, privileges []es.PrivilegeAction, creden result, err := builder. ForPrivilege(privilege). AuthorizationFor(headers.APIKey, credentials). - AuthorizedFor(context.Background(), auth.ResourceInternal) + AuthorizedFor(context.Background(), auth.Resource{}) if err != nil { return err }