Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sourcemaps] Support multiple fleet addresses when requesting a sourcemap #5770

Merged
merged 10 commits into from
Jul 30, 2021
2 changes: 1 addition & 1 deletion sourcemap/es_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
)

var (
errMsgESFailure = "failure querying ES"
errMsgESFailure = errMsgFailure + " ES"
errSourcemapWrongFormat = errors.New("Sourcemapping ES Result not in expected format")
)

Expand Down
83 changes: 68 additions & 15 deletions sourcemap/fleet_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"

"github.com/pkg/errors"
Expand All @@ -35,10 +36,15 @@ import (
logs "github.com/elastic/apm-server/log"
)

const defaultFleetPort = 8220

var errMsgFleetFailure = errMsgFailure + " fleet"

type fleetStore struct {
apikey string
c *http.Client
fleetURLs map[key]string
apikey string
c *http.Client
sourceMapURLs map[key]string
fleetBaseURLs []string
}

type key struct {
Expand Down Expand Up @@ -71,35 +77,82 @@ func newFleetStore(
fleetCfg *config.Fleet,
cfgs []config.SourceMapMetadata,
) (fleetStore, error) {
// TODO(stn): Add support for multiple fleet hosts
// cf. https://github.com/elastic/apm-server/issues/5514
host := fleetCfg.Hosts[0]
fleetURLs := make(map[key]string)
sourceMapURLs := make(map[key]string)
fleetBaseURLs := make([]string, len(fleetCfg.Hosts))

for _, cfg := range cfgs {
k := key{cfg.ServiceName, cfg.ServiceVersion, cfg.BundleFilepath}
u, err := common.MakeURL(fleetCfg.Protocol, cfg.SourceMapURL, host, 8220)
sourceMapURLs[k] = cfg.SourceMapURL
}

for i, host := range fleetCfg.Hosts {
baseURL, err := common.MakeURL(fleetCfg.Protocol, "", host, defaultFleetPort)
if err != nil {
return fleetStore{}, err
}
fleetURLs[k] = u
fleetBaseURLs[i] = baseURL
}

return fleetStore{
apikey: "ApiKey " + fleetCfg.AccessAPIKey,
fleetURLs: fleetURLs,
c: c,
apikey: "ApiKey " + fleetCfg.AccessAPIKey,
fleetBaseURLs: fleetBaseURLs,
sourceMapURLs: sourceMapURLs,
c: c,
}, nil
}

func (f fleetStore) fetch(ctx context.Context, name, version, path string) (string, error) {
k := key{name, version, path}
fleetURL, ok := f.fleetURLs[k]
sourceMapURL, ok := f.sourceMapURLs[k]
if !ok {
return "", fmt.Errorf("unable to find sourcemap.url for service.name=%s service.version=%s bundle.path=%s",
name, version, path,
)
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

type result struct {
sourcemap string
err error
}

results := make(chan result)
var wg sync.WaitGroup
for _, baseURL := range f.fleetBaseURLs {
wg.Add(1)
go func(fleetURL string) {
defer wg.Done()
sourcemap, err := sendRequest(ctx, f, fleetURL)
select {
case <-ctx.Done():
case results <- result{sourcemap, err}:
}
}(baseURL + sourceMapURL)
}

go func() {
wg.Wait()
close(results)
}()

var err error
for result := range results {
err = result.err
if err == nil {
return result.sourcemap, nil
}
axw marked this conversation as resolved.
Show resolved Hide resolved
}

if err != nil {
return "", err
}
// No results were received: context was cancelled.
return "", ctx.Err()
estolfo marked this conversation as resolved.
Show resolved Hide resolved
}

func sendRequest(ctx context.Context, f fleetStore, fleetURL string) (string, error) {
req, err := http.NewRequest(http.MethodGet, fleetURL, nil)
if err != nil {
return "", err
Expand All @@ -116,9 +169,9 @@ func (f fleetStore) fetch(ctx context.Context, name, version, path string) (stri
if resp.StatusCode != http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failure querying fleet: statuscode=%d response=(failed to read body)", resp.StatusCode)
return "", fmt.Errorf(errMsgFleetFailure, ": statuscode=%d response=(failed to read body)", resp.StatusCode)
}
return "", fmt.Errorf("failure querying fleet: statuscode=%d response=%s", resp.StatusCode, body)
return "", fmt.Errorf(errMsgFleetFailure, ": statuscode=%d response=%s", resp.StatusCode, body)
}

// Looking at the index in elasticsearch, currently
Expand Down
121 changes: 115 additions & 6 deletions sourcemap/fleet_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"context"
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"

"github.com/elastic/apm-server/beater/config"
Expand All @@ -32,7 +34,8 @@ import (

func TestFleetFetch(t *testing.T) {
var (
hasAuth bool
hasAuth = true
mu sync.Mutex
apikey = "supersecret"
name = "webapp"
version = "1.0.0"
Expand All @@ -41,19 +44,26 @@ func TestFleetFetch(t *testing.T) {
sourceMapPath = "/api/fleet/artifact"
)

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, sourceMapPath, r.URL.Path)
auth := r.Header.Get("Authorization")
hasAuth = auth == "ApiKey "+apikey
mu.Lock()
hasAuth = hasAuth && (auth == "ApiKey "+apikey)
mu.Unlock()
// zlib compress
wr := zlib.NewWriter(w)
defer wr.Close()
wr.Write([]byte(resp))
}))
defer ts.Close()
})

ts0 := httptest.NewServer(h)
defer ts0.Close()

ts1 := httptest.NewServer(h)
defer ts1.Close()

fleetCfg := &config.Fleet{
Hosts: []string{ts.URL[7:]},
Hosts: []string{ts0.URL[7:], ts1.URL[7:]},
Protocol: "http",
AccessAPIKey: apikey,
TLS: nil,
Expand All @@ -77,4 +87,103 @@ func TestFleetFetch(t *testing.T) {
assert.True(t, hasAuth)
}

func TestFailedAndSuccessfulFleetHostsFetch(t *testing.T) {
var (
called int32
apikey = "supersecret"
name = "webapp"
version = "1.0.0"
path = "/my/path/to/bundle.js.map"
c = http.DefaultClient
sourceMapPath = "/api/fleet/artifact"
)

hError := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&called, 1)
http.Error(w, "err", http.StatusInternalServerError)
})
ts0 := httptest.NewServer(hError)

h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(&called, 1)
wr := zlib.NewWriter(w)
defer wr.Close()
wr.Write([]byte(resp))
})
ts1 := httptest.NewServer(h)
ts2 := httptest.NewServer(h)

fleetCfg := &config.Fleet{
Hosts: []string{ts0.URL[7:], ts1.URL[7:], ts2.URL[7:]},
Protocol: "http",
AccessAPIKey: apikey,
TLS: nil,
}

cfgs := []config.SourceMapMetadata{
{
ServiceName: name,
ServiceVersion: version,
BundleFilepath: path,
SourceMapURL: sourceMapPath,
},
}
f, err := newFleetStore(c, fleetCfg, cfgs)
assert.NoError(t, err)

resp, err := f.fetch(context.Background(), name, version, path)
require.NoError(t, err)
assert.Contains(t, resp, "webpack:///bundle.js")
}

func TestAllFailedFleetHostsFetch(t *testing.T) {
var (
requestCount int32
apikey = "supersecret"
name = "webapp"
version = "1.0.0"
path = "/my/path/to/bundle.js.map"
c = http.DefaultClient
sourceMapPath = "/api/fleet/artifact"
)

h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "err", http.StatusInternalServerError)
atomic.AddInt32(&requestCount, 1)
})

ts0 := httptest.NewServer(h)
defer ts0.Close()

ts1 := httptest.NewServer(h)
defer ts1.Close()

ts2 := httptest.NewServer(h)
defer ts2.Close()

fleetCfg := &config.Fleet{
Hosts: []string{ts0.URL[7:], ts1.URL[7:], ts2.URL[7:]},
Protocol: "http",
AccessAPIKey: apikey,
TLS: nil,
}

cfgs := []config.SourceMapMetadata{
{
ServiceName: name,
ServiceVersion: version,
BundleFilepath: path,
SourceMapURL: sourceMapPath,
},
}
f, err := newFleetStore(c, fleetCfg, cfgs)
assert.NoError(t, err)

resp, err := f.fetch(context.Background(), name, version, path)
assert.EqualValues(t, len(fleetCfg.Hosts), requestCount)
require.Error(t, err)
assert.Contains(t, err.Error(), errMsgFleetFailure)
assert.Equal(t, "", resp)
}

var resp = "{\"serviceName\":\"web-app\",\"serviceVersion\":\"1.0.0\",\"bundleFilepath\":\"/test/e2e/general-usecase/bundle.js.map\",\"sourceMap\":{\"version\":3,\"sources\":[\"webpack:///bundle.js\",\"\",\"webpack:///./scripts/index.js\",\"webpack:///./index.html\",\"webpack:///./scripts/app.js\"],\"names\":[\"modules\",\"__webpack_require__\",\"moduleId\",\"installedModules\",\"exports\",\"module\",\"id\",\"loaded\",\"call\",\"m\",\"c\",\"p\",\"foo\",\"console\",\"log\",\"foobar\"],\"mappings\":\"CAAS,SAAUA,GCInB,QAAAC,GAAAC,GAGA,GAAAC,EAAAD,GACA,MAAAC,GAAAD,GAAAE,OAGA,IAAAC,GAAAF,EAAAD,IACAE,WACAE,GAAAJ,EACAK,QAAA,EAUA,OANAP,GAAAE,GAAAM,KAAAH,EAAAD,QAAAC,IAAAD,QAAAH,GAGAI,EAAAE,QAAA,EAGAF,EAAAD,QAvBA,GAAAD,KAqCA,OATAF,GAAAQ,EAAAT,EAGAC,EAAAS,EAAAP,EAGAF,EAAAU,EAAA,GAGAV,EAAA,KDMM,SAASI,EAAQD,EAASH,GE3ChCA,EAAA,GAEAA,EAAA,GAEAW,OFmDM,SAASP,EAAQD,EAASH,GGxDhCI,EAAAD,QAAAH,EAAAU,EAAA,cH8DM,SAASN,EAAQD,GI9DvB,QAAAQ,KACAC,QAAAC,IAAAC,QAGAH\",\"file\":\"bundle.js\",\"sourcesContent\":[\"/******/ (function(modules) { // webpackBootstrap\\n/******/ \\t// The module cache\\n/******/ \\tvar installedModules = {};\\n/******/\\n/******/ \\t// The require function\\n/******/ \\tfunction __webpack_require__(moduleId) {\\n/******/\\n/******/ \\t\\t// Check if module is in cache\\n/******/ \\t\\tif(installedModules[moduleId])\\n/******/ \\t\\t\\treturn installedModules[moduleId].exports;\\n/******/\\n/******/ \\t\\t// Create a new module (and put it into the cache)\\n/******/ \\t\\tvar module = installedModules[moduleId] = {\\n/******/ \\t\\t\\texports: {},\\n/******/ \\t\\t\\tid: moduleId,\\n/******/ \\t\\t\\tloaded: false\\n/******/ \\t\\t};\\n/******/\\n/******/ \\t\\t// Execute the module function\\n/******/ \\t\\tmodules[moduleId].call(module.exports, module, module.exports, __webpack_require__);\\n/******/\\n/******/ \\t\\t// Flag the module as loaded\\n/******/ \\t\\tmodule.loaded = true;\\n/******/\\n/******/ \\t\\t// Return the exports of the module\\n/******/ \\t\\treturn module.exports;\\n/******/ \\t}\\n/******/\\n/******/\\n/******/ \\t// expose the modules object (__webpack_modules__)\\n/******/ \\t__webpack_require__.m = modules;\\n/******/\\n/******/ \\t// expose the module cache\\n/******/ \\t__webpack_require__.c = installedModules;\\n/******/\\n/******/ \\t// __webpack_public_path__\\n/******/ \\t__webpack_require__.p = \\\"\\\";\\n/******/\\n/******/ \\t// Load entry module and return exports\\n/******/ \\treturn __webpack_require__(0);\\n/******/ })\\n/************************************************************************/\\n/******/ ([\\n/* 0 */\\n/***/ function(module, exports, __webpack_require__) {\\n\\n\\t// Webpack\\n\\t__webpack_require__(1)\\n\\t\\n\\t__webpack_require__(2)\\n\\t\\n\\tfoo()\\n\\n\\n/***/ },\\n/* 1 */\\n/***/ function(module, exports, __webpack_require__) {\\n\\n\\tmodule.exports = __webpack_require__.p + \\\"index.html\\\"\\n\\n/***/ },\\n/* 2 */\\n/***/ function(module, exports) {\\n\\n\\tfunction foo() {\\n\\t console.log(foobar)\\n\\t}\\n\\t\\n\\tfoo()\\n\\n\\n/***/ }\\n/******/ ]);\\n\\n\\n/** WEBPACK FOOTER **\\n ** bundle.js\\n **/\",\" \\t// The module cache\\n \\tvar installedModules = {};\\n\\n \\t// The require function\\n \\tfunction __webpack_require__(moduleId) {\\n\\n \\t\\t// Check if module is in cache\\n \\t\\tif(installedModules[moduleId])\\n \\t\\t\\treturn installedModules[moduleId].exports;\\n\\n \\t\\t// Create a new module (and put it into the cache)\\n \\t\\tvar module = installedModules[moduleId] = {\\n \\t\\t\\texports: {},\\n \\t\\t\\tid: moduleId,\\n \\t\\t\\tloaded: false\\n \\t\\t};\\n\\n \\t\\t// Execute the module function\\n \\t\\tmodules[moduleId].call(module.exports, module, module.exports, __webpack_require__);\\n\\n \\t\\t// Flag the module as loaded\\n \\t\\tmodule.loaded = true;\\n\\n \\t\\t// Return the exports of the module\\n \\t\\treturn module.exports;\\n \\t}\\n\\n\\n \\t// expose the modules object (__webpack_modules__)\\n \\t__webpack_require__.m = modules;\\n\\n \\t// expose the module cache\\n \\t__webpack_require__.c = installedModules;\\n\\n \\t// __webpack_public_path__\\n \\t__webpack_require__.p = \\\"\\\";\\n\\n \\t// Load entry module and return exports\\n \\treturn __webpack_require__(0);\\n\\n\\n\\n/** WEBPACK FOOTER **\\n ** webpack/bootstrap 6002740481c9666b0d38\\n **/\",\"// Webpack\\nrequire('../index.html')\\n\\nrequire('./app')\\n\\nfoo()\\n\\n\\n\\n/*****************\\n ** WEBPACK FOOTER\\n ** ./scripts/index.js\\n ** module id = 0\\n ** module chunks = 0\\n **/\",\"module.exports = __webpack_public_path__ + \\\"index.html\\\"\\n\\n\\n/*****************\\n ** WEBPACK FOOTER\\n ** ./index.html\\n ** module id = 1\\n ** module chunks = 0\\n **/\",\"function foo() {\\n console.log(foobar)\\n}\\n\\nfoo()\\n\\n\\n\\n/*****************\\n ** WEBPACK FOOTER\\n ** ./scripts/app.js\\n ** module id = 2\\n ** module chunks = 0\\n **/\"],\"sourceRoot\":\"\"}}"
7 changes: 4 additions & 3 deletions sourcemap/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ const (
)

var (
errInit = errors.New("Cache cannot be initialized. Expiration and CleanupInterval need to be >= 0")
errMsgFailure = "failure querying"
errInit = errors.New("Cache cannot be initialized. Expiration and CleanupInterval need to be >= 0")
)

// Store holds information necessary to fetch a sourcemap, either from an
Expand Down Expand Up @@ -114,10 +115,10 @@ func (s *Store) Fetch(ctx context.Context, name, version, path string) (*sourcem
s.mu.Unlock()
}()

// fetch from Elasticsearch and ensure caching for all non-temporary results
// fetch from the store and ensure caching for all non-temporary results
sourcemapStr, err := s.backend.fetch(ctx, name, version, path)
if err != nil {
if !strings.Contains(err.Error(), "failure querying") {
if !strings.Contains(err.Error(), errMsgFailure) {
s.add(key, nil)
}
return nil, err
Expand Down