Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/mongodbatlas] Safeguard polled alerts potentially overloading exporters #14611

Merged
16 changes: 16 additions & 0 deletions .chloggen/atlas-polling-scalability-concern.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: mongodbatlasreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Safeguard against large amounts of alert emissions.

# One or more tracking issues related to the change
issues: [14610]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
6 changes: 4 additions & 2 deletions receiver/mongodbatlasreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ MongoDB Atlas [Documentation](https://www.mongodb.com/docs/atlas/reference/api/l
- `secret` (required if using `listen` mode)
- `endpoint` (required if using `listen` mode)
- `poll_interval` (default `5m`, only relevant using `poll` mode)
- `max_alerts` (default `1000`)
- When in `poll` mode, this is the maximum number of alerts the receiver will attempt to process per poll. Alerts are processed from most recently created to least recently created.
- `page_size` (default `100`)
- When in `poll` mode, this is the number of alerts that will be processed per request to the MongoDB Atlas API.
- `max_pages` (default `10`)
- When in `poll` mode, this will limit how many pages of alerts the receiver will request for each project.
- `projects` (required if using `poll` mode)
- `name` (required if using `poll mode`)
- `include_clusters` (default empty, exclusive with `exclude_clusters`)
Expand Down
56 changes: 38 additions & 18 deletions receiver/mongodbatlasreceiver/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ const (
alertCacheKey = "last_recorded_alert"

defaultAlertsPollInterval = 5 * time.Minute
defaultMaxAlerts = 1000
// defaults were based off API docs https://www.mongodb.com/docs/atlas/reference/api/alerts-get-all-alerts/
defaultAlertsPageSize = 100
defaultAlertsMaxPages = 10
)

type alertsClient interface {
GetProject(ctx context.Context, groupID string) (*mongodbatlas.Project, error)
GetAlerts(ctx context.Context, groupID string, maxAlerts int64) ([]mongodbatlas.Alert, error)
GetAlerts(ctx context.Context, groupID string, opts *internal.AlertPollOptions) ([]mongodbatlas.Alert, bool, error)
}

type alertsReceiver struct {
Expand All @@ -86,7 +88,8 @@ type alertsReceiver struct {
retrySettings exporterhelper.RetrySettings
pollInterval time.Duration
record *alertRecord
maxAlerts int64
pageSize int64
maxPages int64
doneChan chan bool
id config.ComponentID // ID of the receiver component
storageID *config.ComponentID // ID of the storage extension component
Expand Down Expand Up @@ -122,7 +125,8 @@ func newAlertsReceiver(logger *zap.Logger, baseConfig *Config, consumer consumer
privateKey: baseConfig.PrivateKey,
wg: &sync.WaitGroup{},
pollInterval: baseConfig.Alerts.PollInterval,
maxAlerts: baseConfig.Alerts.MaxAlertProcessing,
maxPages: baseConfig.Alerts.MaxPages,
pageSize: baseConfig.Alerts.PageSize,
doneChan: make(chan bool, 1),
logger: logger,
id: baseConfig.ID(),
Expand Down Expand Up @@ -182,30 +186,46 @@ func (a *alertsReceiver) startPolling(ctx context.Context, host component.Host)
}

func (a *alertsReceiver) retrieveAndProcessAlerts(ctx context.Context) error {
var alerts []mongodbatlas.Alert
for _, p := range a.projects {
project, err := a.client.GetProject(ctx, p.Name)
if err != nil {
a.logger.Error("error retrieving project "+p.Name+":", zap.Error(err))
continue
}
projectAlerts, err := a.client.GetAlerts(ctx, project.ID, a.maxAlerts)
a.pollAndProcess(ctx, p, project)
}
return a.writeCheckpoint(ctx)
}

func (a *alertsReceiver) pollAndProcess(ctx context.Context, pc *ProjectConfig, project *mongodbatlas.Project) {
for pageNum := 1; pageNum <= int(a.maxPages); pageNum++ {
projectAlerts, hasNext, err := a.client.GetAlerts(ctx, project.ID, &internal.AlertPollOptions{
PageNum: pageNum,
PageSize: int(a.pageSize),
})
if err != nil {
a.logger.Error("unable to get alerts for project", zap.Error(err))
continue
break
}

filteredAlerts := a.applyFilters(pc, projectAlerts)
now := pcommon.NewTimestampFromTime(time.Now())
logs, err := a.convertAlerts(now, filteredAlerts)
if err != nil {
a.logger.Error("error processing alerts", zap.Error(err))
break
}

if logs.LogRecordCount() > 0 {
if err = a.consumer.ConsumeLogs(ctx, logs); err != nil {
a.logger.Error("error consuming alerts", zap.Error(err))
break
}
}
if !hasNext {
break
}
filteredAlerts := a.applyFilters(p, projectAlerts)
alerts = append(alerts, filteredAlerts...)
}
now := pcommon.NewTimestampFromTime(time.Now())
logs, err := a.convertAlerts(now, alerts)
if err != nil {
return err
}
if logs.LogRecordCount() > 0 {
return a.consumer.ConsumeLogs(ctx, logs)
}
return a.writeCheckpoint(ctx)
}

func (a *alertsReceiver) startListening(ctx context.Context, host component.Host) error {
Expand Down
4 changes: 3 additions & 1 deletion receiver/mongodbatlasreceiver/alerts_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func TestAtlasPoll(t *testing.T) {
mockClient.On("GetProject", mock.Anything, testProjectName).Return(&mongodbatlas.Project{
ID: testProjectID,
}, nil)
mockClient.On("GetAlerts", mock.Anything, testProjectID, mock.Anything).Return(alerts, nil)
mockClient.On("GetAlerts", mock.Anything, testProjectID, mock.Anything).Return(alerts, false, nil)

sink := &consumertest.LogsSink{}
fact := NewFactory()
Expand All @@ -237,6 +237,8 @@ func TestAtlasPoll(t *testing.T) {
},
},
PollInterval: 1 * time.Second,
PageSize: defaultAlertsPageSize,
MaxPages: defaultAlertsMaxPages,
},
},
sink,
Expand Down
64 changes: 39 additions & 25 deletions receiver/mongodbatlasreceiver/alerts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/mongodbatlasreceiver/internal/model"
)

Expand Down Expand Up @@ -612,6 +613,8 @@ func TestAlertsRetrieval(t *testing.T) {
Name: testProjectName,
},
},
PageSize: defaultAlertsPageSize,
MaxPages: defaultAlertsMaxPages,
PollInterval: 1 * time.Second,
},
}
Expand Down Expand Up @@ -645,6 +648,8 @@ func TestAlertsRetrieval(t *testing.T) {
IncludeClusters: []string{testClusterName},
},
},
PageSize: defaultAlertsPageSize,
MaxPages: defaultAlertsMaxPages,
PollInterval: 1 * time.Second,
},
}
Expand Down Expand Up @@ -685,12 +690,15 @@ func TestAlertPollingExclusions(t *testing.T) {
alertsRcvr, err := newAlertsReceiver(zap.NewNop(), &Config{
Alerts: AlertConfig{
Enabled: true,
Mode: alertModePoll,
Projects: []*ProjectConfig{
{
Name: testProjectName,
ExcludeClusters: []string{testClusterName},
},
},
PageSize: defaultAlertsPageSize,
MaxPages: defaultAlertsMaxPages,
PollInterval: 1 * time.Second,
},
}, logSink)
Expand All @@ -703,6 +711,8 @@ func TestAlertPollingExclusions(t *testing.T) {
require.Never(t, func() bool {
return logSink.LogRecordCount() > 0
}, 3*time.Second, 10*time.Millisecond)

require.NoError(t, alertsRcvr.Shutdown(context.Background()))
}

func testClient() *mockAlertsClient {
Expand All @@ -715,32 +725,36 @@ func testClient() *mockAlertsClient {
}, nil)
ac.On("GetAlerts", mock.Anything, testProjectID, mock.Anything).Return(
[]mongodbatlas.Alert{
{
ID: testAlertID,
GroupID: testGroupID,
AlertConfigID: "",
EventTypeName: testTypeName,
Created: time.Now().Format(time.RFC3339),
Updated: time.Now().Format(time.RFC3339),
Enabled: new(bool),
Status: "TRACKING",
MetricName: testMetricName,
CurrentValue: &mongodbatlas.CurrentValue{
Number: new(float64),
Units: "By",
},
ReplicaSetName: "",
ClusterName: testClusterName,
HostnameAndPort: testHostNameAndPort,
Matchers: []mongodbatlas.Matcher{},
MetricThreshold: &mongodbatlas.MetricThreshold{},
Notifications: []mongodbatlas.Notification{},
},
testAlert(),
},
nil)
false, nil)
return ac
}

func testAlert() mongodbatlas.Alert {
return mongodbatlas.Alert{
ID: testAlertID,
GroupID: testGroupID,
AlertConfigID: "",
EventTypeName: testTypeName,
Created: time.Now().Format(time.RFC3339),
Updated: time.Now().Format(time.RFC3339),
Enabled: new(bool),
Status: "TRACKING",
MetricName: testMetricName,
CurrentValue: &mongodbatlas.CurrentValue{
Number: new(float64),
Units: "By",
},
ReplicaSetName: "",
ClusterName: testClusterName,
HostnameAndPort: testHostNameAndPort,
Matchers: []mongodbatlas.Matcher{},
MetricThreshold: &mongodbatlas.MetricThreshold{},
Notifications: []mongodbatlas.Notification{},
}
}

func validateAttributes(t *testing.T, expectedStringAttributes map[string]string, logs plog.Logs) {
for i := 0; i < logs.ResourceLogs().Len(); i++ {
rl := logs.ResourceLogs().At(0)
Expand All @@ -767,7 +781,7 @@ func (mac *mockAlertsClient) GetProject(ctx context.Context, pID string) (*mongo
return args.Get(0).(*mongodbatlas.Project), args.Error(1)
}

func (mac *mockAlertsClient) GetAlerts(ctx context.Context, pID string, maxAlerts int64) ([]mongodbatlas.Alert, error) {
args := mac.Called(ctx, pID, maxAlerts)
return args.Get(0).([]mongodbatlas.Alert), args.Error(1)
func (mac *mockAlertsClient) GetAlerts(ctx context.Context, pID string, opts *internal.AlertPollOptions) ([]mongodbatlas.Alert, bool, error) {
args := mac.Called(ctx, pID, opts)
return args.Get(0).([]mongodbatlas.Alert), args.Bool(1), args.Error(2)
}
14 changes: 11 additions & 3 deletions receiver/mongodbatlasreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ type AlertConfig struct {
Mode string `mapstructure:"mode"`

// these parameters are only relevant in retrieval mode
Projects []*ProjectConfig `mapstructure:"projects"`
PollInterval time.Duration `mapstructure:"poll_interval"`
MaxAlertProcessing int64 `mapstructure:"max_alerts"`
Projects []*ProjectConfig `mapstructure:"projects"`
PollInterval time.Duration `mapstructure:"poll_interval"`
PageSize int64 `mapstructure:"page_size"`
MaxPages int64 `mapstructure:"max_pages"`
}

type LogConfig struct {
Expand Down Expand Up @@ -96,6 +97,7 @@ var (
alertModeListen,
alertModePoll,
}, ","))
errPageSizeIncorrect = errors.New("page size must be a value between 1 and 500")

// Logs Receiver Errors
errNoProjects = errors.New("at least one 'project' must be specified")
Expand Down Expand Up @@ -152,12 +154,18 @@ func (a AlertConfig) validatePollConfig() error {
return errNoProjects
}

// based off API limits https://www.mongodb.com/docs/atlas/reference/api/alerts-get-all-alerts/
if 0 >= a.PageSize || a.PageSize > 500 {
return errPageSizeIncorrect
}

var errs error
for _, project := range a.Projects {
if len(project.ExcludeClusters) != 0 && len(project.IncludeClusters) != 0 {
errs = multierr.Append(errs, errClusterConfig)
}
}

return errs
}

Expand Down
19 changes: 19 additions & 0 deletions receiver/mongodbatlasreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func TestValidate(t *testing.T) {
IncludeClusters: []string{"cluster2"},
},
},
PageSize: defaultAlertsPageSize,
},
},
expectedErr: errClusterConfig.Error(),
Expand All @@ -175,6 +176,7 @@ func TestValidate(t *testing.T) {
Enabled: true,
Mode: alertModePoll,
Projects: []*ProjectConfig{},
PageSize: defaultAlertsPageSize,
},
},
expectedErr: errNoProjects.Error(),
Expand All @@ -190,6 +192,7 @@ func TestValidate(t *testing.T) {
Name: "Project1",
},
},
PageSize: defaultAlertsPageSize,
},
},
},
Expand All @@ -204,6 +207,22 @@ func TestValidate(t *testing.T) {
},
expectedErr: errNoModeRecognized.Error(),
},
{
name: "Invalid Page Size",
input: Config{
Alerts: AlertConfig{
Enabled: true,
Mode: alertModePoll,
Projects: []*ProjectConfig{
{
Name: "Test",
},
},
PageSize: -1,
},
},
expectedErr: errPageSizeIncorrect.Error(),
},
}

for _, tc := range testCases {
Expand Down
9 changes: 5 additions & 4 deletions receiver/mongodbatlasreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ func createDefaultConfig() config.Receiver {
RetrySettings: exporterhelper.NewDefaultRetrySettings(),
Metrics: metadata.DefaultMetricsSettings(),
Alerts: AlertConfig{
Enabled: defaultAlertsEnabled,
Mode: alertModeListen,
PollInterval: defaultAlertsPollInterval,
MaxAlertProcessing: defaultMaxAlerts,
Enabled: defaultAlertsEnabled,
Mode: alertModeListen,
PollInterval: defaultAlertsPollInterval,
PageSize: defaultAlertsPageSize,
MaxPages: defaultAlertsMaxPages,
},
Logs: LogConfig{
Enabled: defaultLogsEnabled,
Expand Down
Loading