Skip to content

Commit

Permalink
Merge branch 'master' into pubsub-close
Browse files Browse the repository at this point in the history
  • Loading branch information
axw authored Jun 30, 2021
2 parents 1d70eb6 + 846116b commit b9f7185
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 18 deletions.
9 changes: 7 additions & 2 deletions agentcfg/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package agentcfg

import (
"context"
"sync"
"time"

"github.com/elastic/apm-server/model"
Expand Down Expand Up @@ -64,8 +65,10 @@ func (r Reporter) Fetch(ctx context.Context, query Query) (Result, error) {
}

func (r Reporter) Run(ctx context.Context) error {
// keeps track of the agent_configs that have been queried and applied
// to agents.
var wg sync.WaitGroup
defer wg.Wait()

// applied tracks the etags of agent config that has been applied.
applied := make(map[string]struct{})
t := time.NewTicker(r.interval)
defer t.Stop()
Expand All @@ -92,7 +95,9 @@ func (r Reporter) Run(ctx context.Context) error {
// Reset applied map, so that we report only configs applied
// during a given iteration.
applied = make(map[string]struct{})
wg.Add(1)
go func() {
defer wg.Done()
if err := r.p.ProcessBatch(ctx, batch); err != nil {
r.logger.Errorf("error sending applied agent configs to kibana: %v", err)
}
Expand Down
17 changes: 13 additions & 4 deletions agentcfg/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"

"github.com/elastic/apm-server/model"
)
Expand All @@ -34,8 +35,11 @@ func TestReportFetch(t *testing.T) {
defer close(receivedc)
bp := &batchProcessor{receivedc: receivedc}
r := NewReporter(fauxFetcher{}, bp, interval)

var g errgroup.Group
ctx, cancel := context.WithCancel(context.Background())
go r.Run(ctx)
g.Go(func() error { return r.Run(ctx) })

query1 := Query{
Service: Service{Name: "webapp", Environment: "production"},
Etag: "abc123",
Expand All @@ -56,10 +60,15 @@ func TestReportFetch(t *testing.T) {

// cancel the context to stop processing
cancel()
g.Wait()

assert.Len(t, bp.received, 2)
assert.Equal(t, "abc123", bp.received[0].Labels["etag"])
assert.Equal(t, "def456", bp.received[1].Labels["etag"])
// We use assert.ElementsMatch because the etags may not be
// reported in exactly the same order they were fetched.
etags := make([]string, len(bp.received))
for i, received := range bp.received {
etags[i] = received.Labels["etag"].(string)
}
assert.ElementsMatch(t, []string{"abc123", "def456"}, etags)
}

type fauxFetcher struct{}
Expand Down
3 changes: 3 additions & 0 deletions docs/secure-communication-agents.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ as there is no way to prevent them from being publicly exposed.

**APM Server configuration**

NOTE: {ess} and {ece} deployments provision a secret token when the deployment is created.
The secret token can be found and reset in the {ecloud} console under **Deployments** -- **APM & Fleet**.

Here's how you set the secret token in APM Server:

[source,yaml]
Expand Down
16 changes: 16 additions & 0 deletions kibana/send_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ func flattenAndClean(conf *ucfg.Config) (map[string]interface{}, error) {
if strings.HasPrefix(k, "instrumentation") {
continue
}
if strings.HasPrefix(k, "logging.") {
switch k[8:] {
case "level", "selectors", "metrics.enabled", "metrics.period":
default:
continue
}
}
if strings.HasPrefix(k, "path") {
continue
}
if k == "gc_percent" || k == "name" || k == "xpack.monitoring.enabled" {
continue
}
if k == "apm-server.host" {
v = "0.0.0.0:8200"
}
if strings.HasPrefix(k, "apm-server.ssl.") {
// Following ssl related settings need to be synced:
// apm-server.ssl.enabled
Expand Down
34 changes: 32 additions & 2 deletions kibana/send_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

func TestFlattenAndFormat(t *testing.T) {
tlsFieldsCount := 0
tlsFieldsCount, loggingFieldCount := 0, 0
cc, err := common.NewConfigWithYAML([]byte(serverYAML), "apm-server.yml")
c := ucfg.Config(*cc)
require.NoError(t, err)
Expand All @@ -42,10 +42,25 @@ func TestFlattenAndFormat(t *testing.T) {
assert.Contains(t, flat, "schema")

flat = flat["schema"].(map[string]interface{})
for k := range flat {
for k, v := range flat {
assert.NotContains(t, k, "elasticsearch")
assert.NotContains(t, k, "kibana")
assert.NotContains(t, k, "instrumentation")
assert.NotContains(t, k, "path.")
assert.NotEqual(t, k, "name")
assert.NotContains(t, k, "gc_percent")
assert.NotContains(t, k, "xpack.monitoring.enabled")
if strings.HasPrefix(k, "logging.") {
switch k {
case "logging.level", "logging.selectors", "logging.metrics.enabled", "logging.metrics.period":
loggingFieldCount++
default:
assert.Fail(t, fmt.Sprintf("should not be present: %s", k))
}
}
if k == "apm-server.host" {
assert.Equal(t, "0.0.0.0:8200", v)
}
if strings.HasPrefix(k, "apm-server.ssl.") {
switch k[15:] {
case "enabled", "certificate", "key":
Expand Down Expand Up @@ -78,11 +93,26 @@ var serverYAML = `apm-server:
certificate: 'my-cert'
key_passphrase: 'pass-phrase'
verify_mode: 'strict'
name: 'test-name'
rum:
enabled: false
event_rate:
limit: 300
lru_size: 1000
gc_percent: 70
logging:
level: 'debug'
selectors: ['intake']
metrics:
enabled: true
period: 10s
files.name: "apm.log"
json: true
path.config: "/app/config"
path.data: "/app/data"
path:
home: "/app/"
xpack.monitoring.enabled: true
output.elasticsearch:
hosts: ["localhost:9200"]
enabled: true
Expand Down
20 changes: 10 additions & 10 deletions sourcemap/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,21 @@ func TestStore_Fetch(t *testing.T) {
}

func TestFetchTimeout(t *testing.T) {
// TODO(stn): fix this flaky test
t.Skip()
var (
errs int64

apikey = "supersecret"
name = "webapp"
version = "1.0.0"
path = "/my/path/to/bundle.js.map"
c = http.DefaultClient
apikey = "supersecret"
name = "webapp"
version = "1.0.0"
path = "/my/path/to/bundle.js.map"
c = http.DefaultClient
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond)
)
defer cancel()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-r.Context().Done()
<-ctx.Done()
}))
defer ts.Close()

Expand All @@ -188,11 +192,7 @@ func TestFetchTimeout(t *testing.T) {
store, err := newStore(b, logger, time.Minute)
assert.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()

_, err = store.Fetch(ctx, name, version, path)
time.Sleep(10 * time.Millisecond)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
atomic.AddInt64(&errs, 1)

Expand Down

0 comments on commit b9f7185

Please sign in to comment.