Skip to content

Commit

Permalink
[receiver/mongodb] Drop support for versions prior to 4.0 (open-telem…
Browse files Browse the repository at this point in the history
…etry#16187)

* [receiver/mongodb] Drop support for versions prior to 4.0

* fix changelog component
  • Loading branch information
djaglowski authored and shalper2 committed Dec 6, 2022
1 parent fbd1339 commit 2057d6a
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 175 deletions.
16 changes: 16 additions & 0 deletions .chloggen/mongodb-update-supported-versions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: mongodbreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Drop support for versions of MongoDB prior to 4.0

# One or more tracking issues related to the change
issues: [16182]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
4 changes: 1 addition & 3 deletions receiver/mongodbreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@ The purpose of this receiver is to allow users to monitor metrics from standalon

This receiver supports MongoDB versions:

- 2.6
- 3.0+
- 4.0+
- 5.0

Mongodb recommends to set up a least privilege user (LPU) with a [`clusterMonitor` role](https://www.mongodb.com/docs/v5.0/reference/built-in-roles/#mongodb-authrole-clusterMonitor) in order to collect metrics. Please refer to [lpu.sh](./testdata/integration/scripts/lpu.sh) for an example of how to configure these permissions.


## Configuration

The following settings are optional:
Expand Down Expand Up @@ -61,6 +58,7 @@ The full list of settings exposed for this receiver are documented [here](./conf
## Metrics
The following metric are available with versions:
- `mongodb.extent.count` < 4.4 with mmapv1 storage engine
- `mongodb.session.count` >= 3.0 with wiredTiger storage engine
- `mongodb.cache.operations` >= 3.0 with wiredTiger storage engine
Expand Down
92 changes: 0 additions & 92 deletions receiver/mongodbreceiver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,6 @@ import (
var (
LPUSetupScript = []string{"/lpu.sh"}
setupScript = []string{"/setup.sh"}
containerRequest2_6 = testcontainers.ContainerRequest{
FromDockerfile: testcontainers.FromDockerfile{
Context: filepath.Join("testdata", "integration"),
Dockerfile: "Dockerfile.mongodb.2_6",
},
ExposedPorts: []string{"27017:27017"},
WaitingFor: wait.ForListeningPort("27017").WithStartupTimeout(2 * time.Minute),
}
containerRequest3_0 = testcontainers.ContainerRequest{
FromDockerfile: testcontainers.FromDockerfile{
Context: filepath.Join("testdata", "integration"),
Dockerfile: "Dockerfile.mongodb.3_0",
},
ExposedPorts: []string{"27117:27017"},
WaitingFor: wait.ForListeningPort("27017").WithStartupTimeout(2 * time.Minute),
}
containerRequest4_0 = testcontainers.ContainerRequest{
FromDockerfile: testcontainers.FromDockerfile{
Context: filepath.Join("testdata", "integration"),
Expand Down Expand Up @@ -89,82 +73,6 @@ var (
)

func TestMongodbIntegration(t *testing.T) {
t.Run("Running mongodb 2.6", func(t *testing.T) {
t.Parallel()
container := getContainer(t, containerRequest2_6, setupScript)
defer func() {
require.NoError(t, container.Terminate(context.Background()))
}()
hostname, err := container.Host(context.Background())
require.NoError(t, err)

f := NewFactory()
cfg := f.CreateDefaultConfig().(*Config)
cfg.Hosts = []confignet.NetAddr{
{
Endpoint: net.JoinHostPort(hostname, "27017"),
},
}
cfg.Insecure = true

consumer := new(consumertest.MetricsSink)
settings := componenttest.NewNopReceiverCreateSettings()
rcvr, err := f.CreateMetricsReceiver(context.Background(), settings, cfg, consumer)
require.NoError(t, err, "failed creating metrics receiver")

require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
require.Eventuallyf(t, func() bool {
return len(consumer.AllMetrics()) > 0
}, 2*time.Minute, 1*time.Second, "failed to receive more than 0 metrics")
require.NoError(t, rcvr.Shutdown(context.Background()))

actualMetrics := consumer.AllMetrics()[0]

expectedFile := filepath.Join("testdata", "integration", "expected.2_6.json")
expectedMetrics, err := golden.ReadMetrics(expectedFile)
require.NoError(t, err)

err = scrapertest.CompareMetrics(expectedMetrics, actualMetrics, scrapertest.IgnoreMetricValues())
require.NoError(t, err)
})
t.Run("Running mongodb 3.0", func(t *testing.T) {
t.Parallel()
container := getContainer(t, containerRequest3_0, setupScript)
defer func() {
require.NoError(t, container.Terminate(context.Background()))
}()
hostname, err := container.Host(context.Background())
require.NoError(t, err)

f := NewFactory()
cfg := f.CreateDefaultConfig().(*Config)
cfg.Hosts = []confignet.NetAddr{
{
Endpoint: net.JoinHostPort(hostname, "27117"),
},
}
cfg.Insecure = true

consumer := new(consumertest.MetricsSink)
settings := componenttest.NewNopReceiverCreateSettings()
rcvr, err := f.CreateMetricsReceiver(context.Background(), settings, cfg, consumer)
require.NoError(t, err, "failed creating metrics receiver")

require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
require.Eventuallyf(t, func() bool {
return len(consumer.AllMetrics()) > 0
}, 2*time.Minute, 1*time.Second, "failed to receive more than 0 metrics")
require.NoError(t, rcvr.Shutdown(context.Background()))

actualMetrics := consumer.AllMetrics()[0]

expectedFile := filepath.Join("testdata", "integration", "expected.3_0.json")
expectedMetrics, err := golden.ReadMetrics(expectedFile)
require.NoError(t, err)

err = scrapertest.CompareMetrics(expectedMetrics, actualMetrics, scrapertest.IgnoreMetricValues())
require.NoError(t, err)
})
t.Run("Running mongodb 4.0", func(t *testing.T) {
t.Parallel()
container := getContainer(t, containerRequest4_0, setupScript)
Expand Down
60 changes: 17 additions & 43 deletions receiver/mongodbreceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (s *mongodbScraper) recordExtentCount(now pcommon.Timestamp, doc bson.M, db
// Mongo version 4.4+ no longer returns numExtents since it is part of the obsolete MMAPv1
// https://www.mongodb.com/docs/manual/release-notes/4.4-compatibility/#mmapv1-cleanup
mongo44, _ := version.NewVersion("4.4")
if s.mongoVersion.LessThan(mongo44) {
if s.mongoVersion != nil && s.mongoVersion.LessThan(mongo44) {
metricPath := []string{"numExtents"}
metricName := "mongodb.extent.count"
val, err := collectMetric(doc, metricPath)
Expand All @@ -152,13 +152,7 @@ func (s *mongodbScraper) recordExtentCount(now pcommon.Timestamp, doc bson.M, db

// ServerStatus
func (s *mongodbScraper) recordConnections(now pcommon.Timestamp, doc bson.M, dbName string, errs *scrapererror.ScrapeErrors) {
mongo40, _ := version.NewVersion("4.0")
for ctVal, ct := range metadata.MapAttributeConnectionType {
// Mongo version 4.0 added active
// reference: https://www.mongodb.com/docs/v4.0/reference/command/serverStatus/#serverstatus.connections.active
if s.mongoVersion.LessThan(mongo40) && ctVal == "active" {
continue
}
metricPath := []string{"connections", ctVal}
metricName := "mongodb.connection.count"
metricAttributes := fmt.Sprintf("%s, %s", ctVal, dbName)
Expand Down Expand Up @@ -202,13 +196,6 @@ func (s *mongodbScraper) recordDocumentOperations(now pcommon.Timestamp, doc bso
}

func (s *mongodbScraper) recordSessionCount(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) {
// Collect session count for version 3.0+
// https://www.mongodb.com/docs/v3.0/reference/command/serverStatus/#serverStatus.wiredTiger.session
mongo30, _ := version.NewVersion("3.0")
if s.mongoVersion.LessThan(mongo30) {
return
}

storageEngine, err := dig(doc, []string{"storageEngine", "name"})
if err != nil {
errs.AddPartial(1, errors.New("failed to find storage engine for session count"))
Expand Down Expand Up @@ -244,14 +231,6 @@ func (s *mongodbScraper) recordOperations(now pcommon.Timestamp, doc bson.M, err
}

func (s *mongodbScraper) recordCacheOperations(now pcommon.Timestamp, doc bson.M, errs *scrapererror.ScrapeErrors) {
// Collect Cache Hits & Misses if wiredTiger storage engine is used
// WiredTiger.cache metrics are available in 3.0+
// https://www.mongodb.com/docs/v4.0/reference/command/serverStatus/#serverstatus.wiredTiger.cache
mongo30, _ := version.NewVersion("3.0")
if s.mongoVersion.LessThan(mongo30) {
return
}

storageEngine, err := dig(doc, []string{"storageEngine", "name"})
if err != nil {
errs.AddPartial(1, errors.New("failed to find storage engine for cache operations"))
Expand Down Expand Up @@ -457,29 +436,24 @@ func (s *mongodbScraper) recordLockDeadlockCount(now pcommon.Timestamp, doc bson

// Index Stats
func (s *mongodbScraper) recordIndexAccess(now pcommon.Timestamp, documents []bson.M, dbName string, collectionName string, errs *scrapererror.ScrapeErrors) {
// Collect the index access given a collection and database if version is >= 3.2
// https://www.mongodb.com/docs/v3.2/reference/operator/aggregation/indexStats/
mongo32, _ := version.NewVersion("3.2")
if s.mongoVersion.GreaterThanOrEqual(mongo32) {
metricName := "mongodb.index.access.count"
var indexAccessTotal int64
for _, doc := range documents {
metricAttributes := fmt.Sprintf("%s, %s", dbName, collectionName)
indexAccess, ok := doc["accesses"].(bson.M)["ops"]
if !ok {
err := errors.New("could not find key for index access metric")
errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, metricAttributes, err))
return
}
indexAccessValue, err := parseInt(indexAccess)
if err != nil {
errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, metricAttributes, err))
return
}
indexAccessTotal += indexAccessValue
metricName := "mongodb.index.access.count"
var indexAccessTotal int64
for _, doc := range documents {
metricAttributes := fmt.Sprintf("%s, %s", dbName, collectionName)
indexAccess, ok := doc["accesses"].(bson.M)["ops"]
if !ok {
err := errors.New("could not find key for index access metric")
errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, metricAttributes, err))
return
}
indexAccessValue, err := parseInt(indexAccess)
if err != nil {
errs.AddPartial(1, fmt.Errorf(collectMetricWithAttributes, metricName, metricAttributes, err))
return
}
s.mb.RecordMongodbIndexAccessCountDataPoint(now, indexAccessTotal, dbName, collectionName)
indexAccessTotal += indexAccessValue
}
s.mb.RecordMongodbIndexAccessCountDataPoint(now, indexAccessTotal, dbName, collectionName)
}

// Top Stats
Expand Down
16 changes: 6 additions & 10 deletions receiver/mongodbreceiver/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ func (s *mongodbScraper) scrape(ctx context.Context) (pmetric.Metrics, error) {

if s.mongoVersion == nil {
version, err := s.client.GetVersion(ctx)
if err != nil {
return pmetric.NewMetrics(), fmt.Errorf("unable to determine version of mongo scraping against: %w", err)
if err == nil {
s.mongoVersion = version
} else {
s.logger.Warn("determine mongo version", zap.Error(err))
}
s.mongoVersion = version
}

errs := &scrapererror.ScrapeErrors{}
Expand Down Expand Up @@ -102,13 +103,8 @@ func (s *mongodbScraper) collectMetrics(ctx context.Context, errs *scrapererror.
return
}

// Mongo version 4.0+ is required to have authorized access to list collection names
// reference: https://www.mongodb.com/docs/manual/reference/method/db.getCollectionNames/
mongo40, _ := version.NewVersion("4.0")
if s.mongoVersion.GreaterThanOrEqual(mongo40) {
for _, collectionName := range collectionNames {
s.collectIndexStats(ctx, now, dbName, collectionName, errs)
}
for _, collectionName := range collectionNames {
s.collectIndexStats(ctx, now, dbName, collectionName, errs)
}
}
}
Expand Down
15 changes: 0 additions & 15 deletions receiver/mongodbreceiver/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,6 @@ func TestScraperScrape(t *testing.T) {
},
expectedErr: errors.New("no client was initialized before calling scrape"),
},
{
desc: "Failed to get version",
partialErr: false,
setupMockClient: func(t *testing.T) client {
fc := &fakeClient{}
mongo40, err := version.NewVersion("4.0")
require.NoError(t, err)
fc.On("GetVersion", mock.Anything).Return(mongo40, errors.New("some version error"))
return fc
},
expectedMetricGen: func(t *testing.T) pmetric.Metrics {
return pmetric.NewMetrics()
},
expectedErr: errors.New("unable to determine version of mongo scraping against: some version error"),
},
{
desc: "Failed to fetch database names",
partialErr: true,
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 2057d6a

Please sign in to comment.