Skip to content

Commit

Permalink
[sourcemaps] Support multiple fleet addresses when requesting a sourc…
Browse files Browse the repository at this point in the history
…emap (elastic#5770)

* Send request to multiple fleet servers when fetching sourcemap

* Handle case when context is canceled before any results are sent to the results channel

* Update sourcemap/fleet_store.go

Co-authored-by: Andrew Wilkins <[email protected]>

* Combine test for single and multiple fleet servers

* Use atomic.AddInt32 to increment shared request counter var

* Add test for a mix of successful and failed fleet server sourcemap requests

* Simplify test for failed and successful fleet hosts fetch

* Context should be first argument

* Lock setting shared variable in test

* Remove unnecessary called var

Co-authored-by: Andrew Wilkins <[email protected]>
  • Loading branch information
estolfo and axw committed Aug 2, 2021
1 parent 3836710 commit eef5785
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 25 deletions.
2 changes: 1 addition & 1 deletion sourcemap/es_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
)

var (
errMsgESFailure = "failure querying ES"
errMsgESFailure = errMsgFailure + " ES"
errSourcemapWrongFormat = errors.New("Sourcemapping ES Result not in expected format")
)

Expand Down
83 changes: 68 additions & 15 deletions sourcemap/fleet_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"

"github.com/pkg/errors"
Expand All @@ -35,10 +36,15 @@ import (
logs "github.com/elastic/apm-server/log"
)

const defaultFleetPort = 8220

var errMsgFleetFailure = errMsgFailure + " fleet"

type fleetStore struct {
apikey string
c *http.Client
fleetURLs map[key]string
apikey string
c *http.Client
sourceMapURLs map[key]string
fleetBaseURLs []string
}

type key struct {
Expand Down Expand Up @@ -71,35 +77,82 @@ func newFleetStore(
fleetCfg *config.Fleet,
cfgs []config.SourceMapMetadata,
) (fleetStore, error) {
// TODO(stn): Add support for multiple fleet hosts
// cf. https://github.com/elastic/apm-server/issues/5514
host := fleetCfg.Hosts[0]
fleetURLs := make(map[key]string)
sourceMapURLs := make(map[key]string)
fleetBaseURLs := make([]string, len(fleetCfg.Hosts))

for _, cfg := range cfgs {
k := key{cfg.ServiceName, cfg.ServiceVersion, cfg.BundleFilepath}
u, err := common.MakeURL(fleetCfg.Protocol, cfg.SourceMapURL, host, 8220)
sourceMapURLs[k] = cfg.SourceMapURL
}

for i, host := range fleetCfg.Hosts {
baseURL, err := common.MakeURL(fleetCfg.Protocol, "", host, defaultFleetPort)
if err != nil {
return fleetStore{}, err
}
fleetURLs[k] = u
fleetBaseURLs[i] = baseURL
}

return fleetStore{
apikey: "ApiKey " + fleetCfg.AccessAPIKey,
fleetURLs: fleetURLs,
c: c,
apikey: "ApiKey " + fleetCfg.AccessAPIKey,
fleetBaseURLs: fleetBaseURLs,
sourceMapURLs: sourceMapURLs,
c: c,
}, nil
}

func (f fleetStore) fetch(ctx context.Context, name, version, path string) (string, error) {
k := key{name, version, path}
fleetURL, ok := f.fleetURLs[k]
sourceMapURL, ok := f.sourceMapURLs[k]
if !ok {
return "", fmt.Errorf("unable to find sourcemap.url for service.name=%s service.version=%s bundle.path=%s",
name, version, path,
)
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

type result struct {
sourcemap string
err error
}

results := make(chan result)
var wg sync.WaitGroup
for _, baseURL := range f.fleetBaseURLs {
wg.Add(1)
go func(fleetURL string) {
defer wg.Done()
sourcemap, err := sendRequest(ctx, f, fleetURL)
select {
case <-ctx.Done():
case results <- result{sourcemap, err}:
}
}(baseURL + sourceMapURL)
}

go func() {
wg.Wait()
close(results)
}()

var err error
for result := range results {
err = result.err
if err == nil {
return result.sourcemap, nil
}
}

if err != nil {
return "", err
}
// No results were received: context was cancelled.
return "", ctx.Err()
}

func sendRequest(ctx context.Context, f fleetStore, fleetURL string) (string, error) {
req, err := http.NewRequest(http.MethodGet, fleetURL, nil)
if err != nil {
return "", err
Expand All @@ -116,9 +169,9 @@ func (f fleetStore) fetch(ctx context.Context, name, version, path string) (stri
if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failure querying fleet: statuscode=%d response=(failed to read body)", resp.StatusCode)
return "", fmt.Errorf(errMsgFleetFailure, ": statuscode=%d response=(failed to read body)", resp.StatusCode)
}
return "", fmt.Errorf("failure querying fleet: statuscode=%d response=%s", resp.StatusCode, body)
return "", fmt.Errorf(errMsgFleetFailure, ": statuscode=%d response=%s", resp.StatusCode, body)
}

// Looking at the index in elasticsearch, currently
Expand Down
118 changes: 112 additions & 6 deletions sourcemap/fleet_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"context"
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"

"github.com/elastic/apm-server/beater/config"
Expand All @@ -32,7 +34,8 @@ import (

func TestFleetFetch(t *testing.T) {
var (
hasAuth bool
hasAuth = true
mu sync.Mutex
apikey = "supersecret"
name = "webapp"
version = "1.0.0"
Expand All @@ -41,19 +44,26 @@ func TestFleetFetch(t *testing.T) {
sourceMapPath = "/api/fleet/artifact"
)

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, sourceMapPath, r.URL.Path)
auth := r.Header.Get("Authorization")
hasAuth = auth == "ApiKey "+apikey
mu.Lock()
hasAuth = hasAuth && (auth == "ApiKey "+apikey)
mu.Unlock()
// zlib compress
wr := zlib.NewWriter(w)
defer wr.Close()
wr.Write([]byte(resp))
}))
defer ts.Close()
})

ts0 := httptest.NewServer(h)
defer ts0.Close()

ts1 := httptest.NewServer(h)
defer ts1.Close()

fleetCfg := &config.Fleet{
Hosts: []string{ts.URL[7:]},
Hosts: []string{ts0.URL[7:], ts1.URL[7:]},
Protocol: "http",
AccessAPIKey: apikey,
TLS: nil,
Expand All @@ -77,4 +87,100 @@ func TestFleetFetch(t *testing.T) {
assert.True(t, hasAuth)
}

func TestFailedAndSuccessfulFleetHostsFetch(t *testing.T) {
var (
apikey = "supersecret"
name = "webapp"
version = "1.0.0"
path = "/my/path/to/bundle.js.map"
c = http.DefaultClient
sourceMapPath = "/api/fleet/artifact"
)

hError := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "err", http.StatusInternalServerError)
})
ts0 := httptest.NewServer(hError)

h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wr := zlib.NewWriter(w)
defer wr.Close()
wr.Write([]byte(resp))
})
ts1 := httptest.NewServer(h)
ts2 := httptest.NewServer(h)

fleetCfg := &config.Fleet{
Hosts: []string{ts0.URL[7:], ts1.URL[7:], ts2.URL[7:]},
Protocol: "http",
AccessAPIKey: apikey,
TLS: nil,
}

cfgs := []config.SourceMapMetadata{
{
ServiceName: name,
ServiceVersion: version,
BundleFilepath: path,
SourceMapURL: sourceMapPath,
},
}
f, err := newFleetStore(c, fleetCfg, cfgs)
assert.NoError(t, err)

resp, err := f.fetch(context.Background(), name, version, path)
require.NoError(t, err)
assert.Contains(t, resp, "webpack:///bundle.js")
}

func TestAllFailedFleetHostsFetch(t *testing.T) {
var (
requestCount int32
apikey = "supersecret"
name = "webapp"
version = "1.0.0"
path = "/my/path/to/bundle.js.map"
c = http.DefaultClient
sourceMapPath = "/api/fleet/artifact"
)

h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "err", http.StatusInternalServerError)
atomic.AddInt32(&requestCount, 1)
})

ts0 := httptest.NewServer(h)
defer ts0.Close()

ts1 := httptest.NewServer(h)
defer ts1.Close()

ts2 := httptest.NewServer(h)
defer ts2.Close()

fleetCfg := &config.Fleet{
Hosts: []string{ts0.URL[7:], ts1.URL[7:], ts2.URL[7:]},
Protocol: "http",
AccessAPIKey: apikey,
TLS: nil,
}

cfgs := []config.SourceMapMetadata{
{
ServiceName: name,
ServiceVersion: version,
BundleFilepath: path,
SourceMapURL: sourceMapPath,
},
}
f, err := newFleetStore(c, fleetCfg, cfgs)
assert.NoError(t, err)

resp, err := f.fetch(context.Background(), name, version, path)
assert.EqualValues(t, len(fleetCfg.Hosts), requestCount)
require.Error(t, err)
assert.Contains(t, err.Error(), errMsgFleetFailure)
assert.Equal(t, "", resp)
}

var resp = "{\"serviceName\":\"web-app\",\"serviceVersion\":\"1.0.0\",\"bundleFilepath\":\"/test/e2e/general-usecase/bundle.js.map\",\"sourceMap\":{\"version\":3,\"sources\":[\"webpack:///bundle.js\",\"\",\"webpack:///./scripts/index.js\",\"webpack:///./index.html\",\"webpack:///./scripts/app.js\"],\"names\":[\"modules\",\"__webpack_require__\",\"moduleId\",\"installedModules\",\"exports\",\"module\",\"id\",\"loaded\",\"call\",\"m\",\"c\",\"p\",\"foo\",\"console\",\"log\",\"foobar\"],\"mappings\":\"CAAS,SAAUA,GCInB,QAAAC,GAAAC,GAGA,GAAAC,EAAAD,GACA,MAAAC,GAAAD,GAAAE,OAGA,IAAAC,GAAAF,EAAAD,IACAE,WACAE,GAAAJ,EACAK,QAAA,EAUA,OANAP,GAAAE,GAAAM,KAAAH,EAAAD,QAAAC,IAAAD,QAAAH,GAGAI,EAAAE,QAAA,EAGAF,EAAAD,QAvBA,GAAAD,KAqCA,OATAF,GAAAQ,EAAAT,EAGAC,EAAAS,EAAAP,EAGAF,EAAAU,EAAA,GAGAV,EAAA,KDMM,SAASI,EAAQD,EAASH,GE3ChCA,EAAA,GAEAA,EAAA,GAEAW,OFmDM,SAASP,EAAQD,EAASH,GGxDhCI,EAAAD,QAAAH,EAAAU,EAAA,cH8DM,SAASN,EAAQD,GI9DvB,QAAAQ,KACAC,QAAAC,IAAAC,QAGAH\",\"file\":\"bundle.js\",\"sourcesContent\":[\"/******/ (function(modules) { // webpackBootstrap\\n/******/ \\t// The module cache\\n/******/ \\tvar installedModules = {};\\n/******/\\n/******/ \\t// The require function\\n/******/ \\tfunction __webpack_require__(moduleId) {\\n/******/\\n/******/ \\t\\t// Check if module is in cache\\n/******/ \\t\\tif(installedModules[moduleId])\\n/******/ \\t\\t\\treturn installedModules[moduleId].exports;\\n/******/\\n/******/ \\t\\t// Create a new module (and put it into the cache)\\n/******/ \\t\\tvar module = installedModules[moduleId] = {\\n/******/ \\t\\t\\texports: {},\\n/******/ \\t\\t\\tid: moduleId,\\n/******/ \\t\\t\\tloaded: false\\n/******/ \\t\\t};\\n/******/\\n/******/ \\t\\t// Execute the module function\\n/******/ \\t\\tmodules[moduleId].call(module.exports, module, module.exports, __webpack_require__);\\n/******/\\n/******/ \\t\\t// Flag the module as loaded\\n/******/ \\t\\tmodule.loaded = true;\\n/******/\\n/******/ \\t\\t// Return the exports of the module\\n/******/ \\t\\treturn module.exports;\\n/******/ \\t}\\n/******/\\n/******/\\n/******/ \\t// expose the modules object (__webpack_modules__)\\n/******/ \\t__webpack_require__.m = modules;\\n/******/\\n/******/ \\t// expose the module cache\\n/******/ \\t__webpack_require__.c = installedModules;\\n/******/\\n/******/ \\t// __webpack_public_path__\\n/******/ \\t__webpack_require__.p = \\\"\\\";\\n/******/\\n/******/ \\t// Load entry module and return exports\\n/******/ \\treturn __webpack_require__(0);\\n/******/ })\\n/************************************************************************/\\n/******/ ([\\n/* 0 */\\n/***/ function(module, exports, __webpack_require__) {\\n\\n\\t// Webpack\\n\\t__webpack_require__(1)\\n\\t\\n\\t__webpack_require__(2)\\n\\t\\n\\tfoo()\\n\\n\\n/***/ },\\n/* 1 */\\n/***/ function(module, exports, __webpack_require__) {\\n\\n\\tmodule.exports = __webpack_require__.p + \\\"index.html\\\"\\n\\n/***/ },\\n/* 2 */\\n/***/ function(module, exports) {\\n\\n\\tfunction foo() {\\n\\t console.log(foobar)\\n\\t}\\n\\t\\n\\tfoo()\\n\\n\\n/***/ }\\n/******/ ]);\\n\\n\\n/** WEBPACK FOOTER **\\n ** bundle.js\\n **/\",\" \\t// The module cache\\n \\tvar installedModules = {};\\n\\n \\t// The require function\\n \\tfunction __webpack_require__(moduleId) {\\n\\n \\t\\t// Check if module is in cache\\n \\t\\tif(installedModules[moduleId])\\n \\t\\t\\treturn installedModules[moduleId].exports;\\n\\n \\t\\t// Create a new module (and put it into the cache)\\n \\t\\tvar module = installedModules[moduleId] = {\\n \\t\\t\\texports: {},\\n \\t\\t\\tid: moduleId,\\n \\t\\t\\tloaded: false\\n \\t\\t};\\n\\n \\t\\t// Execute the module function\\n \\t\\tmodules[moduleId].call(module.exports, module, module.exports, __webpack_require__);\\n\\n \\t\\t// Flag the module as loaded\\n \\t\\tmodule.loaded = true;\\n\\n \\t\\t// Return the exports of the module\\n \\t\\treturn module.exports;\\n \\t}\\n\\n\\n \\t// expose the modules object (__webpack_modules__)\\n \\t__webpack_require__.m = modules;\\n\\n \\t// expose the module cache\\n \\t__webpack_require__.c = installedModules;\\n\\n \\t// __webpack_public_path__\\n \\t__webpack_require__.p = \\\"\\\";\\n\\n \\t// Load entry module and return exports\\n \\treturn __webpack_require__(0);\\n\\n\\n\\n/** WEBPACK FOOTER **\\n ** webpack/bootstrap 6002740481c9666b0d38\\n **/\",\"// Webpack\\nrequire('../index.html')\\n\\nrequire('./app')\\n\\nfoo()\\n\\n\\n\\n/*****************\\n ** WEBPACK FOOTER\\n ** ./scripts/index.js\\n ** module id = 0\\n ** module chunks = 0\\n **/\",\"module.exports = __webpack_public_path__ + \\\"index.html\\\"\\n\\n\\n/*****************\\n ** WEBPACK FOOTER\\n ** ./index.html\\n ** module id = 1\\n ** module chunks = 0\\n **/\",\"function foo() {\\n console.log(foobar)\\n}\\n\\nfoo()\\n\\n\\n\\n/*****************\\n ** WEBPACK FOOTER\\n ** ./scripts/app.js\\n ** module id = 2\\n ** module chunks = 0\\n **/\"],\"sourceRoot\":\"\"}}"
7 changes: 4 additions & 3 deletions sourcemap/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ const (
)

var (
errInit = errors.New("Cache cannot be initialized. Expiration and CleanupInterval need to be >= 0")
errMsgFailure = "failure querying"
errInit = errors.New("Cache cannot be initialized. Expiration and CleanupInterval need to be >= 0")
)

// Store holds information necessary to fetch a sourcemap, either from an
Expand Down Expand Up @@ -114,10 +115,10 @@ func (s *Store) Fetch(ctx context.Context, name, version, path string) (*sourcem
s.mu.Unlock()
}()

// fetch from Elasticsearch and ensure caching for all non-temporary results
// fetch from the store and ensure caching for all non-temporary results
sourcemapStr, err := s.backend.fetch(ctx, name, version, path)
if err != nil {
if !strings.Contains(err.Error(), "failure querying") {
if !strings.Contains(err.Error(), errMsgFailure) {
s.add(key, nil)
}
return nil, err
Expand Down

0 comments on commit eef5785

Please sign in to comment.