Skip to content

Commit

Permalink
[chore] Migrate jmx receiver integration test (#23066)
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Jun 7, 2023
1 parent 99bd0a9 commit 8dc41a8
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 124 deletions.
13 changes: 11 additions & 2 deletions internal/coreinternal/scraperinttest/scraperint.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func NewIntegrationTest(f receiver.Factory, opts ...TestOption) *IntegrationTest
type IntegrationTest struct {
networkRequest *testcontainers.NetworkRequest
containerRequests []testcontainers.ContainerRequest
allowHardcodedHostPort bool
createContainerTimeout time.Duration

factory receiver.Factory
Expand Down Expand Up @@ -199,8 +200,10 @@ func (it *IntegrationTest) validate(t *testing.T) {
} else {
containerNames[cr.Name] = true
}
for _, port := range cr.ExposedPorts {
require.False(t, strings.ContainsRune(port, ':'), "exposed port hardcoded to host port: %q", port)
if !it.allowHardcodedHostPort {
for _, port := range cr.ExposedPorts {
require.False(t, strings.ContainsRune(port, ':'), "exposed port hardcoded to host port: %q", port)
}
}
require.NoError(t, cr.Validate())
}
Expand All @@ -220,6 +223,12 @@ func WithContainerRequest(cr testcontainers.ContainerRequest) TestOption {
}
}

func AllowHardcodedHostPort() TestOption {
return func(it *IntegrationTest) {
it.allowHardcodedHostPort = true
}
}

func WithCreateContainerTimeout(t time.Duration) TestOption {
return func(it *IntegrationTest) {
it.createContainerTimeout = t
Expand Down
171 changes: 54 additions & 117 deletions receiver/jmxreceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/scraperinttest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
)

const jmxPort = "7199"

var jmxJarReleases = map[string]string{
"1.26.0-alpha": "https://repo1.maven.org/maven2/io/opentelemetry/contrib/opentelemetry-jmx-metrics/1.26.0-alpha/opentelemetry-jmx-metrics-1.26.0-alpha.jar",
"1.10.0-alpha": "https://repo1.maven.org/maven2/io/opentelemetry/contrib/opentelemetry-jmx-metrics/1.10.0-alpha/opentelemetry-jmx-metrics-1.10.0-alpha.jar",
Expand All @@ -52,8 +52,8 @@ func (suite *JMXIntegrationSuite) SetupSuite() {
suite.VersionToJar = make(map[string]string)
for version, url := range jmxJarReleases {
jarPath, err := downloadJMXMetricGathererJAR(url)
require.NoError(suite.T(), err)
suite.VersionToJar[version] = jarPath
require.NoError(suite.T(), err)
}
}

Expand All @@ -80,124 +80,61 @@ func downloadJMXMetricGathererJAR(url string) (string, error) {
return file.Name(), err
}

func cassandraContainer(t *testing.T) testcontainers.Container {
ctx := context.Background()
req := testcontainers.ContainerRequest{
FromDockerfile: testcontainers.FromDockerfile{
Context: "testdata",
Dockerfile: "Dockerfile.cassandra",
},
ExposedPorts: []string{"7199:7199"},
WaitingFor: wait.ForListeningPort("7199"),
}
cassandra, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
require.NoError(t, err)
return cassandra
}

func getJavaStdout(receiver *jmxMetricReceiver) string {
msg := ""
LOOP:
for i := 0; i < 70; i++ {
t := time.NewTimer(5 * time.Second)
select {
case m, ok := <-receiver.subprocess.Stdout:
if ok {
msg = msg + m + "\n"
} else {
break LOOP
}
case <-t.C:
break LOOP
}
}
return fmt.Sprintf("metrics not collected: %v\n", msg)
}

func getLogsOnFailure(t *testing.T, logObserver *observer.ObservedLogs) {
if !t.Failed() {
return
}
fmt.Printf("Logs: \n")
for _, statement := range logObserver.All() {
fmt.Printf("%v\n", statement)
func (suite *JMXIntegrationSuite) TestJMXReceiverHappyPath() {
for version, jar := range suite.VersionToJar {
suite.T().Run(version, integrationTest(version, jar))
}
}

func (suite *JMXIntegrationSuite) TestJMXReceiverHappyPath() {

for version, jar := range suite.VersionToJar {
t := suite.T()
// Run one test per JMX receiver version we're integrating with.
t.Run(version, func(t *testing.T) {
cassandra := cassandraContainer(t)
defer func() {
require.NoError(t, cassandra.Terminate(context.Background()))
}()
hostname, err := cassandra.Host(context.Background())
require.NoError(t, err)

logCore, logObserver := observer.New(zap.DebugLevel)
defer getLogsOnFailure(t, logObserver)

logger := zap.New(logCore)
params := receivertest.NewNopCreateSettings()
params.Logger = logger

cfg := &Config{
CollectionInterval: 3 * time.Second,
Endpoint: fmt.Sprintf("%v:7199", hostname),
JARPath: jar,
TargetSystem: "cassandra",
OTLPExporterConfig: otlpExporterConfig{
Endpoint: "127.0.0.1:0",
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 1000 * time.Millisecond,
},
func integrationTest(version string, jar string) func(*testing.T) {
return scraperinttest.NewIntegrationTest(
NewFactory(),
scraperinttest.WithContainerRequest(
testcontainers.ContainerRequest{
Image: "cassandra:3.11",
Env: map[string]string{
"LOCAL_JMX": "no",
"JVM_OPTS": "-Djava.rmi.server.hostname=0.0.0.0",
},
Password: "cassandra",
Username: "cassandra",
ResourceAttributes: map[string]string{
Files: []testcontainers.ContainerFile{{
HostFilePath: filepath.Join("testdata", "integration", "jmxremote.password"),
ContainerFilePath: "/etc/cassandra/jmxremote.password",
FileMode: 400,
}},
ExposedPorts: []string{jmxPort + ":" + jmxPort},
WaitingFor: wait.ForListeningPort(jmxPort),
}),
scraperinttest.AllowHardcodedHostPort(),
scraperinttest.WithCustomConfig(
func(t *testing.T, cfg component.Config, ci *scraperinttest.ContainerInfo) {
rCfg := cfg.(*Config)
rCfg.CollectionInterval = 3 * time.Second
rCfg.JARPath = jar
rCfg.Endpoint = fmt.Sprintf("%v:%s", ci.Host(t), ci.MappedPort(t, jmxPort))
rCfg.TargetSystem = "cassandra"
rCfg.Username = "cassandra"
rCfg.Password = "cassandra"
rCfg.ResourceAttributes = map[string]string{
"myattr": "myvalue",
"myotherattr": "myothervalue",
},
LogLevel: "debug",
}
require.NoError(t, cfg.Validate())

consumer := new(consumertest.MetricsSink)
require.NotNil(t, consumer)

receiver := newJMXMetricReceiver(params, cfg, consumer)
require.NotNil(t, receiver)
defer func() {
require.Nil(t, receiver.Shutdown(context.Background()))
}()

require.NoError(t, receiver.Start(context.Background(), componenttest.NewNopHost()))

// Wait for multiple collections, in case the first represents partially started system
require.Eventually(t, func() bool {
return len(consumer.AllMetrics()) > 1
}, 30*time.Second, 100*time.Millisecond, getJavaStdout(receiver))

metric := consumer.AllMetrics()[1]

// golden.WriteMetrics(t, filepath.Join("testdata", "integration", fmt.Sprintf("expected.%s.yaml", version)), metric)
expected, err := golden.ReadMetrics(filepath.Join("testdata", "integration", fmt.Sprintf("expected.%s.yaml", version)))
assert.NoError(t, err)
assert.NoError(t, pmetrictest.CompareMetrics(expected, metric,
pmetrictest.IgnoreStartTimestamp(),
pmetrictest.IgnoreTimestamp(),
pmetrictest.IgnoreResourceMetricsOrder(),
pmetrictest.IgnoreMetricValues(),
pmetrictest.IgnoreMetricsOrder(),
pmetrictest.IgnoreMetricDataPointsOrder()))
})
}
}
rCfg.OTLPExporterConfig = otlpExporterConfig{
Endpoint: "127.0.0.1:0",
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: time.Second,
},
}
}),
scraperinttest.WithExpectedFile(filepath.Join("testdata", "integration", version, "expected.yaml")),
scraperinttest.WithCompareOptions(
pmetrictest.IgnoreStartTimestamp(),
pmetrictest.IgnoreTimestamp(),
pmetrictest.IgnoreResourceMetricsOrder(),
pmetrictest.IgnoreMetricValues(),
pmetrictest.IgnoreMetricsOrder(),
pmetrictest.IgnoreMetricDataPointsOrder(),
),
).Run
}

func TestJMXReceiverInvalidOTLPEndpointIntegration(t *testing.T) {
Expand Down
5 changes: 0 additions & 5 deletions receiver/jmxreceiver/testdata/Dockerfile.cassandra

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
cassandra cassandra

0 comments on commit 8dc41a8

Please sign in to comment.