Skip to content

Commit

Permalink
Remove sourcemap/test and elasticsearch/estest (#5854)
Browse files Browse the repository at this point in the history
elasticsearch/estest was only used inside the sourcemap
package's tests. Remove this package and refactor tests
to use an httptest.Server where feasible.

Change the beater test that was using sourcemap/test
for test data to use the sourcemap in testdata/sourcemap
instead, and remove sourcemap/test.

(cherry picked from commit 7ccc202)

# Conflicts:
#	elasticsearch/estest/client.go
  • Loading branch information
axw authored and mergify-bot committed Aug 2, 2021
1 parent 3ddcd32 commit a971d02
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 235 deletions.
8 changes: 5 additions & 3 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
Expand All @@ -40,7 +41,6 @@ import (
"github.com/elastic/apm-server/beater/config"
"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/sourcemap/test"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/instrumentation"
Expand Down Expand Up @@ -268,11 +268,13 @@ func TestTransformConfigIndex(t *testing.T) {
t.Run("with-observer-version", func(t *testing.T) { test(t, "blah-%{[observer.version]}-blah", "blah-1.2.3-blah") })
}

var validSourcemap, _ = ioutil.ReadFile("../testdata/sourcemap/bundle.js.map")

func TestStoreUsesRUMElasticsearchConfig(t *testing.T) {
var called bool
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
w.Write([]byte(test.ValidSourcemap))
w.Write(validSourcemap)
}))
defer ts.Close()

Expand All @@ -298,7 +300,7 @@ func TestFleetStoreUsed(t *testing.T) {
called = true
wr := zlib.NewWriter(w)
defer wr.Close()
wr.Write([]byte(fmt.Sprintf(`{"sourceMap":%s}`, test.ValidSourcemap)))
wr.Write([]byte(fmt.Sprintf(`{"sourceMap":%s}`, validSourcemap)))
}))
defer ts.Close()

Expand Down
169 changes: 142 additions & 27 deletions sourcemap/es_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,55 +18,60 @@
package sourcemap

import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/go-sourcemap/sourcemap"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"

"github.com/elastic/apm-server/elasticsearch"

"github.com/elastic/apm-server/elasticsearch/estest"
logs "github.com/elastic/apm-server/log"
"github.com/elastic/apm-server/sourcemap/test"
)

func Test_esFetcher_fetchError(t *testing.T) {
for name, tc := range map[string]struct {
statusCode int
esBody map[string]interface{}
temporary bool
statusCode int
clientError bool
responseBody io.Reader
temporary bool
}{
"es not reachable": {
statusCode: -1, temporary: true,
clientError: true,
temporary: true,
},
"es bad request": {
statusCode: http.StatusBadRequest,
},
"empty sourcemap string": {
esBody: map[string]interface{}{
"hits": map[string]interface{}{
"total": map[string]interface{}{"value": 1},
"hits": []map[string]interface{}{
{"_source": map[string]interface{}{
"sourcemap": map[string]interface{}{
"sourcemap": ""}}}}}},
statusCode: http.StatusOK,
responseBody: sourcemapSearchResponseBody(1, []map[string]interface{}{{
"_source": map[string]interface{}{
"sourcemap": map[string]interface{}{
"sourcemap": "",
},
},
}}),
},
} {
t.Run(name, func(t *testing.T) {
statusCode := tc.statusCode
if statusCode == 0 {
statusCode = http.StatusOK
var client elasticsearch.Client
if tc.clientError {
client = newUnavailableElasticsearchClient(t)
} else {
client = newMockElasticsearchClient(t, tc.statusCode, tc.responseBody)
}
client, err := estest.NewElasticsearchClient(estest.NewTransport(t, statusCode, tc.esBody))
require.NoError(t, err)

consumer, err := testESStore(client).fetch(context.Background(), "abc", "1.0", "/tmp")
require.Error(t, err)
if tc.temporary {
assert.Contains(t, err.Error(), errMsgESFailure)
} else {
Expand All @@ -79,15 +84,27 @@ func Test_esFetcher_fetchError(t *testing.T) {

func Test_esFetcher_fetch(t *testing.T) {
for name, tc := range map[string]struct {
client elasticsearch.Client
filePath string
statusCode int
responseBody io.Reader
filePath string
}{
"no sourcemap found": {client: test.ESClientWithSourcemapNotFound(t)},
"sourcemap indicated but not found": {client: test.ESClientWithSourcemapIndicatedNotFound(t)},
"valid sourcemap found": {client: test.ESClientWithValidSourcemap(t), filePath: "bundle.js"},
"no sourcemap found": {
statusCode: http.StatusNotFound,
responseBody: sourcemapSearchResponseBody(0, nil),
},
"sourcemap indicated but not found": {
statusCode: http.StatusOK,
responseBody: sourcemapSearchResponseBody(1, []map[string]interface{}{}),
},
"valid sourcemap found": {
statusCode: http.StatusOK,
responseBody: sourcemapSearchResponseBody(1, []map[string]interface{}{sourcemapHit(validSourcemap)}),
filePath: "bundle.js",
},
} {
t.Run(name, func(t *testing.T) {
sourcemapStr, err := testESStore(tc.client).fetch(context.Background(), "abc", "1.0", "/tmp")
client := newMockElasticsearchClient(t, tc.statusCode, tc.responseBody)
sourcemapStr, err := testESStore(client).fetch(context.Background(), "abc", "1.0", "/tmp")
require.NoError(t, err)

if tc.filePath == "" {
Expand All @@ -104,3 +121,101 @@ func Test_esFetcher_fetch(t *testing.T) {
func testESStore(client elasticsearch.Client) *esStore {
return &esStore{client: client, index: "apm-sourcemap", logger: logp.NewLogger(logs.Sourcemap)}
}

func sourcemapSearchResponseBody(hitsTotal int, hits []map[string]interface{}) io.Reader {
resultHits := map[string]interface{}{
"total": map[string]interface{}{
"value": hitsTotal,
},
}
if hits != nil {
resultHits["hits"] = hits
}
result := map[string]interface{}{"hits": resultHits}
data, err := json.Marshal(result)
if err != nil {
panic(err)
}
return bytes.NewReader(data)
}

func sourcemapHit(sourcemap string) map[string]interface{} {
return map[string]interface{}{
"_source": map[string]interface{}{
"sourcemap": map[string]interface{}{
"sourcemap": sourcemap,
},
},
}
}

// newUnavailableElasticsearchClient returns an elasticsearch.Client configured
// to send requests to an invalid (unavailable) host.
func newUnavailableElasticsearchClient(t testing.TB) elasticsearch.Client {
var transport roundTripperFunc = func(r *http.Request) (*http.Response, error) {
return nil, errors.New("client error")
}
backoff := func(int) time.Duration { return 0 }
client, err := elasticsearch.NewVersionedClient("", "", "", []string{"testing.invalid"}, nil, transport, 1, backoff)
require.NoError(t, err)
return client
}

// newMockElasticsearchClient returns an elasticsearch.Clien configured to send
// requests to an httptest.Server that responds to source map search requests
// with the given status code and response body.
func newMockElasticsearchClient(t testing.TB, statusCode int, responseBody io.Reader) elasticsearch.Client {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(statusCode)
if responseBody != nil {
io.Copy(w, responseBody)
}
}))
t.Cleanup(srv.Close)
config := elasticsearch.DefaultConfig()
config.Backoff.Init = time.Nanosecond
config.Hosts = []string{srv.URL}
client, err := elasticsearch.NewClient(config)
require.NoError(t, err)
return client
}

// validSourcemap is an example of a valid sourcemap for use in tests.
const validSourcemap = `{
"version": 3,
"sources": [
"webpack:///bundle.js",
"",
"webpack:///./scripts/index.js",
"webpack:///./index.html",
"webpack:///./scripts/app.js"
],
"names": [
"modules",
"__webpack_require__",
"moduleId",
"installedModules",
"exports",
"module",
"id",
"loaded",
"call",
"m",
"c",
"p",
"foo",
"console",
"log",
"foobar"
],
"mappings": "CAAS,SAAUA,GCInB,QAAAC,GAAAC,GAGA,GAAAC,EAAAD,GACA,MAAAC,GAAAD,GAAAE,OAGA,IAAAC,GAAAF,EAAAD,IACAE,WACAE,GAAAJ,EACAK,QAAA,EAUA,OANAP,GAAAE,GAAAM,KAAAH,EAAAD,QAAAC,IAAAD,QAAAH,GAGAI,EAAAE,QAAA,EAGAF,EAAAD,QAvBA,GAAAD,KAqCA,OATAF,GAAAQ,EAAAT,EAGAC,EAAAS,EAAAP,EAGAF,EAAAU,EAAA,GAGAV,EAAA,KDMM,SAASI,EAAQD,EAASH,GE3ChCA,EAAA,GAEAA,EAAA,GAEAW,OFmDM,SAASP,EAAQD,EAASH,GGxDhCI,EAAAD,QAAAH,EAAAU,EAAA,cH8DM,SAASN,EAAQD,GI9DvB,QAAAQ,KACAC,QAAAC,IAAAC,QAGAH",
"file": "bundle.js",
"sourcesContent": [
"/******/ (function(modules) { // webpackBootstrap\n/******/ \t// The module cache\n/******/ \tvar installedModules = {};\n/******/\n/******/ \t// The require function\n/******/ \tfunction __webpack_require__(moduleId) {\n/******/\n/******/ \t\t// Check if module is in cache\n/******/ \t\tif(installedModules[moduleId])\n/******/ \t\t\treturn installedModules[moduleId].exports;\n/******/\n/******/ \t\t// Create a new module (and put it into the cache)\n/******/ \t\tvar module = installedModules[moduleId] = {\n/******/ \t\t\texports: {},\n/******/ \t\t\tid: moduleId,\n/******/ \t\t\tloaded: false\n/******/ \t\t};\n/******/\n/******/ \t\t// Execute the module function\n/******/ \t\tmodules[moduleId].call(module.exports, module, module.exports, __webpack_require__);\n/******/\n/******/ \t\t// Flag the module as loaded\n/******/ \t\tmodule.loaded = true;\n/******/\n/******/ \t\t// Return the exports of the module\n/******/ \t\treturn module.exports;\n/******/ \t}\n/******/\n/******/\n/******/ \t// expose the modules object (__webpack_modules__)\n/******/ \t__webpack_require__.m = modules;\n/******/\n/******/ \t// expose the module cache\n/******/ \t__webpack_require__.c = installedModules;\n/******/\n/******/ \t// __webpack_public_path__\n/******/ \t__webpack_require__.p = \"\";\n/******/\n/******/ \t// Load entry module and return exports\n/******/ \treturn __webpack_require__(0);\n/******/ })\n/************************************************************************/\n/******/ ([\n/* 0 */\n/***/ function(module, exports, __webpack_require__) {\n\n\t// Webpack\n\t__webpack_require__(1)\n\t\n\t__webpack_require__(2)\n\t\n\tfoo()\n\n\n/***/ },\n/* 1 */\n/***/ function(module, exports, __webpack_require__) {\n\n\tmodule.exports = __webpack_require__.p + \"index.html\"\n\n/***/ },\n/* 2 */\n/***/ function(module, exports) {\n\n\tfunction foo() {\n\t console.log(foobar)\n\t}\n\t\n\tfoo()\n\n\n/***/ }\n/******/ ]);\n\n\n/** WEBPACK FOOTER **\n ** bundle.js\n **/",
" \t// The module cache\n \tvar installedModules = {};\n\n \t// The require function\n \tfunction __webpack_require__(moduleId) {\n\n \t\t// Check if module is in cache\n \t\tif(installedModules[moduleId])\n \t\t\treturn installedModules[moduleId].exports;\n\n \t\t// Create a new module (and put it into the cache)\n \t\tvar module = installedModules[moduleId] = {\n \t\t\texports: {},\n \t\t\tid: moduleId,\n \t\t\tloaded: false\n \t\t};\n\n \t\t// Execute the module function\n \t\tmodules[moduleId].call(module.exports, module, module.exports, __webpack_require__);\n\n \t\t// Flag the module as loaded\n \t\tmodule.loaded = true;\n\n \t\t// Return the exports of the module\n \t\treturn module.exports;\n \t}\n\n\n \t// expose the modules object (__webpack_modules__)\n \t__webpack_require__.m = modules;\n\n \t// expose the module cache\n \t__webpack_require__.c = installedModules;\n\n \t// __webpack_public_path__\n \t__webpack_require__.p = \"\";\n\n \t// Load entry module and return exports\n \treturn __webpack_require__(0);\n\n\n\n/** WEBPACK FOOTER **\n ** webpack/bootstrap 6002740481c9666b0d38\n **/",
"// Webpack\nrequire('../index.html')\n\nrequire('./app')\n\nfoo()\n\n\n\n/*****************\n ** WEBPACK FOOTER\n ** ./scripts/index.js\n ** module id = 0\n ** module chunks = 0\n **/",
"module.exports = __webpack_public_path__ + \"index.html\"\n\n\n/*****************\n ** WEBPACK FOOTER\n ** ./index.html\n ** module id = 1\n ** module chunks = 0\n **/",
"function foo() {\n console.log(foobar)\n}\n\nfoo()\n\n\n\n/*****************\n ** WEBPACK FOOTER\n ** ./scripts/app.js\n ** module id = 2\n ** module chunks = 0\n **/"
],
"sourceRoot": ""
}`
6 changes: 1 addition & 5 deletions sourcemap/mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/sourcemap/test"

"github.com/go-sourcemap/sourcemap"
"github.com/stretchr/testify/assert"
)
Expand All @@ -37,7 +35,7 @@ func TestMapNilConsumer(t *testing.T) {
}

func TestMapNoMatch(t *testing.T) {
m, err := sourcemap.Parse("", []byte(test.ValidSourcemap))
m, err := sourcemap.Parse("", []byte(validSourcemap))
require.NoError(t, err)

// nothing found for lineno and colno
Expand All @@ -51,8 +49,6 @@ func TestMapNoMatch(t *testing.T) {
}

func TestMapMatch(t *testing.T) {
validSourcemap := test.ValidSourcemap

// Re-encode the sourcemap, adding carriage returns to the
// line endings in the source content.
decoded := make(map[string]interface{})
Expand Down
32 changes: 12 additions & 20 deletions sourcemap/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package sourcemap_test
package sourcemap

import (
"context"
Expand All @@ -30,13 +30,13 @@ import (

"github.com/elastic/apm-server/elasticsearch"
"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/sourcemap"
"github.com/elastic/apm-server/sourcemap/test"
)

func TestBatchProcessor(t *testing.T) {
client := test.ESClientWithValidSourcemap(t)
store, err := sourcemap.NewElasticsearchStore(client, "index", time.Minute)
client := newMockElasticsearchClient(t, http.StatusOK,
sourcemapSearchResponseBody(1, []map[string]interface{}{sourcemapHit(string(validSourcemap))}),
)
store, err := NewElasticsearchStore(client, "index", time.Minute)
require.NoError(t, err)

originalLinenoWithFilename := 1
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestBatchProcessor(t *testing.T) {
},
}

processor := sourcemap.BatchProcessor{Store: store}
processor := BatchProcessor{Store: store}
err = processor.ProcessBatch(context.Background(), &model.Batch{transaction, span1, span2, error1, error2, error3})
assert.NoError(t, err)

Expand Down Expand Up @@ -218,8 +218,8 @@ func TestBatchProcessor(t *testing.T) {
}

func TestBatchProcessorElasticsearchUnavailable(t *testing.T) {
client := test.ESClientUnavailable(t)
store, err := sourcemap.NewElasticsearchStore(client, "index", time.Minute)
client := newUnavailableElasticsearchClient(t)
store, err := NewElasticsearchStore(client, "index", time.Minute)
require.NoError(t, err)

nonMatchingFrame := model.StacktraceFrame{
Expand All @@ -241,7 +241,7 @@ func TestBatchProcessorElasticsearchUnavailable(t *testing.T) {

logp.DevelopmentSetup(logp.ToObserverOutput())
for i := 0; i < 2; i++ {
processor := sourcemap.BatchProcessor{Store: store}
processor := BatchProcessor{Store: store}
err = processor.ProcessBatch(context.Background(), &model.Batch{span, span})
assert.NoError(t, err)
}
Expand All @@ -260,19 +260,11 @@ func TestBatchProcessorElasticsearchUnavailable(t *testing.T) {
func TestBatchProcessorTimeout(t *testing.T) {
var transport roundTripperFunc = func(req *http.Request) (*http.Response, error) {
<-req.Context().Done()
// TODO(axw) remove this "notTimeout" error wrapper when
// https://github.com/elastic/go-elasticsearch/issues/300
// is fixed.
//
// Because context.DeadlineExceeded implements net.Error,
// go-elasticsearch continues retrying and does not exit
// early.
type notTimeout struct{ error }
return nil, notTimeout{req.Context().Err()}
return nil, req.Context().Err()
}
client, err := elasticsearch.NewVersionedClient("", "", "", []string{""}, nil, transport, 3, elasticsearch.DefaultBackoff)
require.NoError(t, err)
store, err := sourcemap.NewElasticsearchStore(client, "index", time.Minute)
store, err := NewElasticsearchStore(client, "index", time.Minute)
require.NoError(t, err)

frame := model.StacktraceFrame{
Expand All @@ -292,7 +284,7 @@ func TestBatchProcessorTimeout(t *testing.T) {
}

before := time.Now()
processor := sourcemap.BatchProcessor{Store: store, Timeout: 100 * time.Millisecond}
processor := BatchProcessor{Store: store, Timeout: 100 * time.Millisecond}
err = processor.ProcessBatch(context.Background(), &model.Batch{span})
assert.NoError(t, err)
taken := time.Since(before)
Expand Down
Loading

0 comments on commit a971d02

Please sign in to comment.