From eef5785d0aab1b80501fd04f57ce2c33576b7c23 Mon Sep 17 00:00:00 2001 From: Emily S Date: Fri, 30 Jul 2021 13:13:26 +0200 Subject: [PATCH] [sourcemaps] Support multiple fleet addresses when requesting a sourcemap (#5770) * Send request to multiple fleet servers when fetching sourcemap * Handle case when context is canceled before any results are sent to the results channel * Update sourcemap/fleet_store.go Co-authored-by: Andrew Wilkins * Combine test for single and multiple fleet servers * Use atomic.AddInt32 to increment shared request counter var * Add test for a mix of successful and failed fleet server sourcemap requests * Simplify test for failed and successful fleet hosts fetch * Context should be first argument * Lock setting shared variable in test * Remove unnecessary called var Co-authored-by: Andrew Wilkins --- sourcemap/es_store.go | 2 +- sourcemap/fleet_store.go | 83 +++++++++++++++++++----- sourcemap/fleet_store_test.go | 118 ++++++++++++++++++++++++++++++++-- sourcemap/store.go | 7 +- 4 files changed, 185 insertions(+), 25 deletions(-) diff --git a/sourcemap/es_store.go b/sourcemap/es_store.go index ad999f4d17e..867dd4c98eb 100644 --- a/sourcemap/es_store.go +++ b/sourcemap/es_store.go @@ -42,7 +42,7 @@ const ( ) var ( - errMsgESFailure = "failure querying ES" + errMsgESFailure = errMsgFailure + " ES" errSourcemapWrongFormat = errors.New("Sourcemapping ES Result not in expected format") ) diff --git a/sourcemap/fleet_store.go b/sourcemap/fleet_store.go index ac1890cafc3..80f855d096b 100644 --- a/sourcemap/fleet_store.go +++ b/sourcemap/fleet_store.go @@ -24,6 +24,7 @@ import ( "fmt" "io/ioutil" "net/http" + "sync" "time" "github.com/pkg/errors" @@ -35,10 +36,15 @@ import ( logs "github.com/elastic/apm-server/log" ) +const defaultFleetPort = 8220 + +var errMsgFleetFailure = errMsgFailure + " fleet" + type fleetStore struct { - apikey string - c *http.Client - fleetURLs map[key]string + apikey string + c *http.Client + sourceMapURLs map[key]string + fleetBaseURLs []string } type key struct { @@ -71,35 +77,82 @@ func newFleetStore( fleetCfg *config.Fleet, cfgs []config.SourceMapMetadata, ) (fleetStore, error) { - // TODO(stn): Add support for multiple fleet hosts - // cf. https://github.com/elastic/apm-server/issues/5514 - host := fleetCfg.Hosts[0] - fleetURLs := make(map[key]string) + sourceMapURLs := make(map[key]string) + fleetBaseURLs := make([]string, len(fleetCfg.Hosts)) for _, cfg := range cfgs { k := key{cfg.ServiceName, cfg.ServiceVersion, cfg.BundleFilepath} - u, err := common.MakeURL(fleetCfg.Protocol, cfg.SourceMapURL, host, 8220) + sourceMapURLs[k] = cfg.SourceMapURL + } + + for i, host := range fleetCfg.Hosts { + baseURL, err := common.MakeURL(fleetCfg.Protocol, "", host, defaultFleetPort) if err != nil { return fleetStore{}, err } - fleetURLs[k] = u + fleetBaseURLs[i] = baseURL } + return fleetStore{ - apikey: "ApiKey " + fleetCfg.AccessAPIKey, - fleetURLs: fleetURLs, - c: c, + apikey: "ApiKey " + fleetCfg.AccessAPIKey, + fleetBaseURLs: fleetBaseURLs, + sourceMapURLs: sourceMapURLs, + c: c, }, nil } func (f fleetStore) fetch(ctx context.Context, name, version, path string) (string, error) { k := key{name, version, path} - fleetURL, ok := f.fleetURLs[k] + sourceMapURL, ok := f.sourceMapURLs[k] if !ok { return "", fmt.Errorf("unable to find sourcemap.url for service.name=%s service.version=%s bundle.path=%s", name, version, path, ) } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + type result struct { + sourcemap string + err error + } + + results := make(chan result) + var wg sync.WaitGroup + for _, baseURL := range f.fleetBaseURLs { + wg.Add(1) + go func(fleetURL string) { + defer wg.Done() + sourcemap, err := sendRequest(ctx, f, fleetURL) + select { + case <-ctx.Done(): + case results <- result{sourcemap, err}: + } + }(baseURL + sourceMapURL) + } + + go func() { + wg.Wait() + close(results) + }() + + var err error + for result := range results { + err = result.err + if err == nil { + return result.sourcemap, nil + } + } + + if err != nil { + return "", err + } + // No results were received: context was cancelled. + return "", ctx.Err() +} + +func sendRequest(ctx context.Context, f fleetStore, fleetURL string) (string, error) { req, err := http.NewRequest(http.MethodGet, fleetURL, nil) if err != nil { return "", err @@ -116,9 +169,9 @@ func (f fleetStore) fetch(ctx context.Context, name, version, path string) (stri if resp.StatusCode != http.StatusOK { body, err := ioutil.ReadAll(resp.Body) if err != nil { - return "", fmt.Errorf("failure querying fleet: statuscode=%d response=(failed to read body)", resp.StatusCode) + return "", fmt.Errorf(errMsgFleetFailure, ": statuscode=%d response=(failed to read body)", resp.StatusCode) } - return "", fmt.Errorf("failure querying fleet: statuscode=%d response=%s", resp.StatusCode, body) + return "", fmt.Errorf(errMsgFleetFailure, ": statuscode=%d response=%s", resp.StatusCode, body) } // Looking at the index in elasticsearch, currently diff --git a/sourcemap/fleet_store_test.go b/sourcemap/fleet_store_test.go index e40e7284d51..d7596205113 100644 --- a/sourcemap/fleet_store_test.go +++ b/sourcemap/fleet_store_test.go @@ -22,6 +22,8 @@ import ( "context" "net/http" "net/http/httptest" + "sync" + "sync/atomic" "testing" "github.com/elastic/apm-server/beater/config" @@ -32,7 +34,8 @@ import ( func TestFleetFetch(t *testing.T) { var ( - hasAuth bool + hasAuth = true + mu sync.Mutex apikey = "supersecret" name = "webapp" version = "1.0.0" @@ -41,19 +44,26 @@ func TestFleetFetch(t *testing.T) { sourceMapPath = "/api/fleet/artifact" ) - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { assert.Equal(t, sourceMapPath, r.URL.Path) auth := r.Header.Get("Authorization") - hasAuth = auth == "ApiKey "+apikey + mu.Lock() + hasAuth = hasAuth && (auth == "ApiKey "+apikey) + mu.Unlock() // zlib compress wr := zlib.NewWriter(w) defer wr.Close() wr.Write([]byte(resp)) - })) - defer ts.Close() + }) + + ts0 := httptest.NewServer(h) + defer ts0.Close() + + ts1 := httptest.NewServer(h) + defer ts1.Close() fleetCfg := &config.Fleet{ - Hosts: []string{ts.URL[7:]}, + Hosts: []string{ts0.URL[7:], ts1.URL[7:]}, Protocol: "http", AccessAPIKey: apikey, TLS: nil, @@ -77,4 +87,100 @@ func TestFleetFetch(t *testing.T) { assert.True(t, hasAuth) } +func TestFailedAndSuccessfulFleetHostsFetch(t *testing.T) { + var ( + apikey = "supersecret" + name = "webapp" + version = "1.0.0" + path = "/my/path/to/bundle.js.map" + c = http.DefaultClient + sourceMapPath = "/api/fleet/artifact" + ) + + hError := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "err", http.StatusInternalServerError) + }) + ts0 := httptest.NewServer(hError) + + h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + wr := zlib.NewWriter(w) + defer wr.Close() + wr.Write([]byte(resp)) + }) + ts1 := httptest.NewServer(h) + ts2 := httptest.NewServer(h) + + fleetCfg := &config.Fleet{ + Hosts: []string{ts0.URL[7:], ts1.URL[7:], ts2.URL[7:]}, + Protocol: "http", + AccessAPIKey: apikey, + TLS: nil, + } + + cfgs := []config.SourceMapMetadata{ + { + ServiceName: name, + ServiceVersion: version, + BundleFilepath: path, + SourceMapURL: sourceMapPath, + }, + } + f, err := newFleetStore(c, fleetCfg, cfgs) + assert.NoError(t, err) + + resp, err := f.fetch(context.Background(), name, version, path) + require.NoError(t, err) + assert.Contains(t, resp, "webpack:///bundle.js") +} + +func TestAllFailedFleetHostsFetch(t *testing.T) { + var ( + requestCount int32 + apikey = "supersecret" + name = "webapp" + version = "1.0.0" + path = "/my/path/to/bundle.js.map" + c = http.DefaultClient + sourceMapPath = "/api/fleet/artifact" + ) + + h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "err", http.StatusInternalServerError) + atomic.AddInt32(&requestCount, 1) + }) + + ts0 := httptest.NewServer(h) + defer ts0.Close() + + ts1 := httptest.NewServer(h) + defer ts1.Close() + + ts2 := httptest.NewServer(h) + defer ts2.Close() + + fleetCfg := &config.Fleet{ + Hosts: []string{ts0.URL[7:], ts1.URL[7:], ts2.URL[7:]}, + Protocol: "http", + AccessAPIKey: apikey, + TLS: nil, + } + + cfgs := []config.SourceMapMetadata{ + { + ServiceName: name, + ServiceVersion: version, + BundleFilepath: path, + SourceMapURL: sourceMapPath, + }, + } + f, err := newFleetStore(c, fleetCfg, cfgs) + assert.NoError(t, err) + + resp, err := f.fetch(context.Background(), name, version, path) + assert.EqualValues(t, len(fleetCfg.Hosts), requestCount) + require.Error(t, err) + assert.Contains(t, err.Error(), errMsgFleetFailure) + assert.Equal(t, "", resp) +} + var resp = "{\"serviceName\":\"web-app\",\"serviceVersion\":\"1.0.0\",\"bundleFilepath\":\"/test/e2e/general-usecase/bundle.js.map\",\"sourceMap\":{\"version\":3,\"sources\":[\"webpack:///bundle.js\",\"\",\"webpack:///./scripts/index.js\",\"webpack:///./index.html\",\"webpack:///./scripts/app.js\"],\"names\":[\"modules\",\"__webpack_require__\",\"moduleId\",\"installedModules\",\"exports\",\"module\",\"id\",\"loaded\",\"call\",\"m\",\"c\",\"p\",\"foo\",\"console\",\"log\",\"foobar\"],\"mappings\":\"CAAS,SAAUA,GCInB,QAAAC,GAAAC,GAGA,GAAAC,EAAAD,GACA,MAAAC,GAAAD,GAAAE,OAGA,IAAAC,GAAAF,EAAAD,IACAE,WACAE,GAAAJ,EACAK,QAAA,EAUA,OANAP,GAAAE,GAAAM,KAAAH,EAAAD,QAAAC,IAAAD,QAAAH,GAGAI,EAAAE,QAAA,EAGAF,EAAAD,QAvBA,GAAAD,KAqCA,OATAF,GAAAQ,EAAAT,EAGAC,EAAAS,EAAAP,EAGAF,EAAAU,EAAA,GAGAV,EAAA,KDMM,SAASI,EAAQD,EAASH,GE3ChCA,EAAA,GAEAA,EAAA,GAEAW,OFmDM,SAASP,EAAQD,EAASH,GGxDhCI,EAAAD,QAAAH,EAAAU,EAAA,cH8DM,SAASN,EAAQD,GI9DvB,QAAAQ,KACAC,QAAAC,IAAAC,QAGAH\",\"file\":\"bundle.js\",\"sourcesContent\":[\"/******/ (function(modules) { // webpackBootstrap\\n/******/ \\t// The module cache\\n/******/ \\tvar installedModules = {};\\n/******/\\n/******/ \\t// The require function\\n/******/ \\tfunction __webpack_require__(moduleId) {\\n/******/\\n/******/ \\t\\t// Check if module is in cache\\n/******/ \\t\\tif(installedModules[moduleId])\\n/******/ \\t\\t\\treturn installedModules[moduleId].exports;\\n/******/\\n/******/ \\t\\t// Create a new module (and put it into the cache)\\n/******/ \\t\\tvar module = installedModules[moduleId] = {\\n/******/ \\t\\t\\texports: {},\\n/******/ \\t\\t\\tid: moduleId,\\n/******/ \\t\\t\\tloaded: false\\n/******/ \\t\\t};\\n/******/\\n/******/ \\t\\t// Execute the module function\\n/******/ \\t\\tmodules[moduleId].call(module.exports, module, module.exports, __webpack_require__);\\n/******/\\n/******/ \\t\\t// Flag the module as loaded\\n/******/ \\t\\tmodule.loaded = true;\\n/******/\\n/******/ \\t\\t// Return the exports of the module\\n/******/ \\t\\treturn module.exports;\\n/******/ \\t}\\n/******/\\n/******/\\n/******/ \\t// expose the modules object (__webpack_modules__)\\n/******/ \\t__webpack_require__.m = modules;\\n/******/\\n/******/ \\t// expose the module cache\\n/******/ \\t__webpack_require__.c = installedModules;\\n/******/\\n/******/ \\t// __webpack_public_path__\\n/******/ \\t__webpack_require__.p = \\\"\\\";\\n/******/\\n/******/ \\t// Load entry module and return exports\\n/******/ \\treturn __webpack_require__(0);\\n/******/ })\\n/************************************************************************/\\n/******/ ([\\n/* 0 */\\n/***/ function(module, exports, __webpack_require__) {\\n\\n\\t// Webpack\\n\\t__webpack_require__(1)\\n\\t\\n\\t__webpack_require__(2)\\n\\t\\n\\tfoo()\\n\\n\\n/***/ },\\n/* 1 */\\n/***/ function(module, exports, __webpack_require__) {\\n\\n\\tmodule.exports = __webpack_require__.p + \\\"index.html\\\"\\n\\n/***/ },\\n/* 2 */\\n/***/ function(module, exports) {\\n\\n\\tfunction foo() {\\n\\t console.log(foobar)\\n\\t}\\n\\t\\n\\tfoo()\\n\\n\\n/***/ }\\n/******/ ]);\\n\\n\\n/** WEBPACK FOOTER **\\n ** bundle.js\\n **/\",\" \\t// The module cache\\n \\tvar installedModules = {};\\n\\n \\t// The require function\\n \\tfunction __webpack_require__(moduleId) {\\n\\n \\t\\t// Check if module is in cache\\n \\t\\tif(installedModules[moduleId])\\n \\t\\t\\treturn installedModules[moduleId].exports;\\n\\n \\t\\t// Create a new module (and put it into the cache)\\n \\t\\tvar module = installedModules[moduleId] = {\\n \\t\\t\\texports: {},\\n \\t\\t\\tid: moduleId,\\n \\t\\t\\tloaded: false\\n \\t\\t};\\n\\n \\t\\t// Execute the module function\\n \\t\\tmodules[moduleId].call(module.exports, module, module.exports, __webpack_require__);\\n\\n \\t\\t// Flag the module as loaded\\n \\t\\tmodule.loaded = true;\\n\\n \\t\\t// Return the exports of the module\\n \\t\\treturn module.exports;\\n \\t}\\n\\n\\n \\t// expose the modules object (__webpack_modules__)\\n \\t__webpack_require__.m = modules;\\n\\n \\t// expose the module cache\\n \\t__webpack_require__.c = installedModules;\\n\\n \\t// __webpack_public_path__\\n \\t__webpack_require__.p = \\\"\\\";\\n\\n \\t// Load entry module and return exports\\n \\treturn __webpack_require__(0);\\n\\n\\n\\n/** WEBPACK FOOTER **\\n ** webpack/bootstrap 6002740481c9666b0d38\\n **/\",\"// Webpack\\nrequire('../index.html')\\n\\nrequire('./app')\\n\\nfoo()\\n\\n\\n\\n/*****************\\n ** WEBPACK FOOTER\\n ** ./scripts/index.js\\n ** module id = 0\\n ** module chunks = 0\\n **/\",\"module.exports = __webpack_public_path__ + \\\"index.html\\\"\\n\\n\\n/*****************\\n ** WEBPACK FOOTER\\n ** ./index.html\\n ** module id = 1\\n ** module chunks = 0\\n **/\",\"function foo() {\\n console.log(foobar)\\n}\\n\\nfoo()\\n\\n\\n\\n/*****************\\n ** WEBPACK FOOTER\\n ** ./scripts/app.js\\n ** module id = 2\\n ** module chunks = 0\\n **/\"],\"sourceRoot\":\"\"}}" diff --git a/sourcemap/store.go b/sourcemap/store.go index f7cdc61c74f..d351723b769 100644 --- a/sourcemap/store.go +++ b/sourcemap/store.go @@ -36,7 +36,8 @@ const ( ) var ( - errInit = errors.New("Cache cannot be initialized. Expiration and CleanupInterval need to be >= 0") + errMsgFailure = "failure querying" + errInit = errors.New("Cache cannot be initialized. Expiration and CleanupInterval need to be >= 0") ) // Store holds information necessary to fetch a sourcemap, either from an @@ -114,10 +115,10 @@ func (s *Store) Fetch(ctx context.Context, name, version, path string) (*sourcem s.mu.Unlock() }() - // fetch from Elasticsearch and ensure caching for all non-temporary results + // fetch from the store and ensure caching for all non-temporary results sourcemapStr, err := s.backend.fetch(ctx, name, version, path) if err != nil { - if !strings.Contains(err.Error(), "failure querying") { + if !strings.Contains(err.Error(), errMsgFailure) { s.add(key, nil) } return nil, err