Skip to content

Commit

Permalink
coordinate successful and failing fetches (#6699) (#6740)
Browse files Browse the repository at this point in the history
make sure that the servers all receive their
requests before confirming that a successful
response from one server cancels the shared
context in the fetch code.

(cherry picked from commit 039bffa)

Co-authored-by: stuart nelson <[email protected]>
  • Loading branch information
mergify[bot] and stuartnelson3 authored Nov 29, 2021
1 parent 12a0735 commit 5b170ea
Showing 1 changed file with 47 additions and 6 deletions.
53 changes: 47 additions & 6 deletions sourcemap/fleet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
"context"
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"

"github.com/go-sourcemap/sourcemap"

"github.com/elastic/apm-server/beater/config"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -83,27 +86,53 @@ func TestFleetFetch(t *testing.T) {
}

func TestFailedAndSuccessfulFleetHostsFetch(t *testing.T) {
type response struct {
consumer *sourcemap.Consumer
err error
}
var (
apikey = "supersecret"
name = "webapp"
version = "1.0.0"
path = "/my/path/to/bundle.js.map"
c = http.DefaultClient
sourceMapPath = "/api/fleet/artifact"
successc = make(chan struct{})
errc = make(chan struct{})
lastc = make(chan struct{})
waitc = make(chan struct{})
resc = make(chan response)
wg sync.WaitGroup
)
wg.Add(3)
defer func() {
close(successc)
close(errc)
close(lastc)
close(resc)
}()

hError := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wg.Done()
errc <- struct{}{}
http.Error(w, "err", http.StatusInternalServerError)
})
ts0 := httptest.NewServer(hError)

h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
h1 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wg.Done()
successc <- struct{}{}
wr := zlib.NewWriter(w)
defer wr.Close()
wr.Write([]byte(resp))
})
ts1 := httptest.NewServer(h)
ts2 := httptest.NewServer(h)
ts1 := httptest.NewServer(h1)
h2 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wg.Done()
lastc <- struct{}{}
close(waitc)
})
ts2 := httptest.NewServer(h2)

fleetCfg := &config.Fleet{
Hosts: []string{ts0.URL[7:], ts1.URL[7:], ts2.URL[7:]},
Expand All @@ -123,9 +152,21 @@ func TestFailedAndSuccessfulFleetHostsFetch(t *testing.T) {
f, err := NewFleetFetcher(c, fleetCfg, cfgs)
assert.NoError(t, err)

consumer, err := f.Fetch(context.Background(), name, version, path)
assert.NoError(t, err)
assert.NotNil(t, consumer)
go func() {
consumer, err := f.Fetch(context.Background(), name, version, path)
resc <- response{consumer, err}
}()
// Make sure every server has received a request
wg.Wait()
<-errc
<-successc
res := <-resc
assert.NoError(t, res.err)
assert.NotNil(t, res.consumer)

// Wait for h2
<-lastc
<-waitc
}

func TestAllFailedFleetHostsFetch(t *testing.T) {
Expand Down

0 comments on commit 5b170ea

Please sign in to comment.