Skip to content

Commit

Permalink
[receiver/mongodbatlasreceiver] Add support to collect organization e…
Browse files Browse the repository at this point in the history
…vents (#20343)
  • Loading branch information
JonathanWamsley authored Mar 31, 2023
1 parent fa1186e commit 6658646
Show file tree
Hide file tree
Showing 12 changed files with 288 additions and 30 deletions.
16 changes: 16 additions & 0 deletions .chloggen/mongodbatlasreceiver-add-org-events.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: mongodbatlasreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support to collect organization events.

# One or more tracking issues related to the change
issues: [19449, 20308]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
13 changes: 11 additions & 2 deletions receiver/mongodbatlasreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

Receives metrics from [MongoDB Atlas](https://www.mongodb.com/cloud/atlas)
via their [monitoring APIs](https://docs.atlas.mongodb.com/reference/api/monitoring-and-logs/),
as well as alerts via a configured [webhook](https://www.mongodb.com/docs/atlas/tutorial/third-party-service-integrations/).
as well as alerts via a configured [webhook](https://www.mongodb.com/docs/atlas/tutorial/third-party-service-integrations/)
and events from [events APIs](https://www.mongodb.com/docs/atlas/reference/api/events/).

## Getting Started

Expand All @@ -19,6 +20,10 @@ below both values are being pulled from the environment.

In order to collect logs, at least one project must be specified. By default, logs for all clusters within a project will be collected. Clusters can be limited using either the `include_clusters` or `exclude_clusters` setting.

In order to collect project events, the requesting API key needs the appropriate permission which at minimum is the `Project Read Only` role. Project events are specific to a single project.

In order to collect organization events, the requesting API key needs the appropriate permission which at minimum is the `Organization Member` role. Organization events are collected across all the projects hosted on Atlas within the organization. These events are not associated with a project.

MongoDB Atlas [Documentation](https://www.mongodb.com/docs/atlas/reference/api/logs/#logs) recommends a polling interval of 5 minutes.

- `public_key` (required for metrics, logs, or alerts in `poll` mode)
Expand Down Expand Up @@ -59,7 +64,9 @@ MongoDB Atlas [Documentation](https://www.mongodb.com/docs/atlas/reference/api/l
- `exclude_clusters` (default empty)
- `events`
- `projects`
- `name` Name of the Project to discover events from
- `name` Name of the Project to discover events from.
- `organizations`
- `id` ID of the Organization to discover events from.
- `poll_interval` (default `1m`)
- How often the receiver will poll the Events API for new events.
- `page_size` (default `100`)
Expand Down Expand Up @@ -129,6 +136,8 @@ receivers:
events:
projects:
- name: "project 1"
organizations:
- id: "5b478b3afc4625789ce616a3"
poll_interval: 1m
page_size: 100
max_pages: 25
Expand Down
20 changes: 13 additions & 7 deletions receiver/mongodbatlasreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@ type LogConfig struct {

// EventsConfig is the configuration options for events collection
type EventsConfig struct {
Projects []*ProjectConfig `mapstructure:"projects"`
PollInterval time.Duration `mapstructure:"poll_interval"`
Types []string `mapstructure:"types"`
PageSize int64 `mapstructure:"page_size"`
MaxPages int64 `mapstructure:"max_pages"`
Projects []*ProjectConfig `mapstructure:"projects"`
Organizations []*OrgConfig `mapstructure:"organizations"`
PollInterval time.Duration `mapstructure:"poll_interval"`
Types []string `mapstructure:"types"`
PageSize int64 `mapstructure:"page_size"`
MaxPages int64 `mapstructure:"max_pages"`
}

type ProjectConfig struct {
Expand All @@ -83,6 +84,10 @@ type ProjectConfig struct {
excludesByClusterName map[string]struct{}
}

type OrgConfig struct {
ID string `mapstructure:"id"`
}

func (pc *ProjectConfig) populateIncludesAndExcludes() *ProjectConfig {
pc.includesByClusterName = map[string]struct{}{}
for _, inclusion := range pc.IncludeClusters {
Expand Down Expand Up @@ -111,6 +116,7 @@ var (

// Logs Receiver Errors
errNoProjects = errors.New("at least one 'project' must be specified")
errNoEvents = errors.New("at least one 'project' or 'organizations' event type must be specified")
errClusterConfig = errors.New("only one of 'include_clusters' or 'exclude_clusters' may be specified")
)

Expand Down Expand Up @@ -209,8 +215,8 @@ func (a AlertConfig) validateListenConfig() error {
}

func (e EventsConfig) validate() error {
if len(e.Projects) == 0 {
return errNoProjects
if len(e.Projects) == 0 && len(e.Organizations) == 0 {
return errNoEvents
}
return nil
}
7 changes: 6 additions & 1 deletion receiver/mongodbatlasreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestValidate(t *testing.T) {
Projects: []*ProjectConfig{},
},
},
expectedErr: errNoProjects.Error(),
expectedErr: errNoEvents.Error(),
},
}

Expand Down Expand Up @@ -296,6 +296,11 @@ func TestLoadConfig(t *testing.T) {
Name: "Project 0",
},
},
Organizations: []*OrgConfig{
{
ID: "5b478b3afc4625789ce616a3",
},
},
PollInterval: time.Minute,
MaxPages: defaultEventsMaxPages,
PageSize: defaultEventsPageSize,
Expand Down
70 changes: 62 additions & 8 deletions receiver/mongodbatlasreceiver/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ const (

type eventsClient interface {
GetProject(ctx context.Context, groupID string) (*mongodbatlas.Project, error)
GetEvents(ctx context.Context, groupID string, opts *internal.GetEventsOptions) (ret []*mongodbatlas.Event, nextPage bool, err error)
GetProjectEvents(ctx context.Context, groupID string, opts *internal.GetEventsOptions) (ret []*mongodbatlas.Event, nextPage bool, err error)
GetOrganization(ctx context.Context, orgID string) (*mongodbatlas.Organization, error)
GetOrganizationEvents(ctx context.Context, orgID string, opts *internal.GetEventsOptions) (ret []*mongodbatlas.Event, nextPage bool, err error)
}

type eventsReceiver struct {
Expand Down Expand Up @@ -142,14 +144,23 @@ func (er *eventsReceiver) pollEvents(ctx context.Context) error {
er.logger.Error("error retrieving project information for "+pc.Name+":", zap.Error(err))
return err
}
er.poll(ctx, project, pc, st, et)
er.pollProject(ctx, project, pc, st, et)
}

for _, pc := range er.cfg.Events.Organizations {
org, err := er.client.GetOrganization(ctx, pc.ID)
if err != nil {
er.logger.Error("error retrieving org information for "+pc.ID+":", zap.Error(err))
return err
}
er.pollOrg(ctx, org, pc, st, et)
}

er.record.NextStartTime = &et
return er.checkpoint(ctx)
}

func (er *eventsReceiver) poll(ctx context.Context, project *mongodbatlas.Project, p *ProjectConfig, startTime, now time.Time) {
func (er *eventsReceiver) pollProject(ctx context.Context, project *mongodbatlas.Project, p *ProjectConfig, startTime, now time.Time) {
for pageN := 1; pageN <= er.maxPages; pageN++ {
opts := &internal.GetEventsOptions{
PageNum: pageN,
Expand All @@ -158,18 +169,49 @@ func (er *eventsReceiver) poll(ctx context.Context, project *mongodbatlas.Projec
MinDate: startTime,
}

projectEvents, hasNext, err := er.client.GetEvents(ctx, project.ID, opts)
projectEvents, hasNext, err := er.client.GetProjectEvents(ctx, project.ID, opts)
if err != nil {
er.logger.Error("unable to get events for project", zap.Error(err), zap.String("project", p.Name))
break
}

now := pcommon.NewTimestampFromTime(now)
logs := er.transformEvents(now, projectEvents, project)
logs := er.transformProjectEvents(now, projectEvents, project)

if logs.LogRecordCount() > 0 {
if err = er.consumer.ConsumeLogs(ctx, logs); err != nil {
er.logger.Error("error consuming project events", zap.Error(err))
break
}
}

if !hasNext {
break
}
}
}

func (er *eventsReceiver) pollOrg(ctx context.Context, org *mongodbatlas.Organization, p *OrgConfig, startTime, now time.Time) {
for pageN := 1; pageN <= er.maxPages; pageN++ {
opts := &internal.GetEventsOptions{
PageNum: pageN,
EventTypes: er.cfg.Events.Types,
MaxDate: now,
MinDate: startTime,
}

organizationEvents, hasNext, err := er.client.GetOrganizationEvents(ctx, org.ID, opts)
if err != nil {
er.logger.Error("unable to get events for organization", zap.Error(err), zap.String("organization", p.ID))
break
}

now := pcommon.NewTimestampFromTime(now)
logs := er.transformOrgEvents(now, organizationEvents, org)

if logs.LogRecordCount() > 0 {
if err = er.consumer.ConsumeLogs(ctx, logs); err != nil {
er.logger.Error("error consuming events", zap.Error(err))
er.logger.Error("error consuming organization events", zap.Error(err))
break
}
}
Expand All @@ -180,13 +222,26 @@ func (er *eventsReceiver) poll(ctx context.Context, project *mongodbatlas.Projec
}
}

func (er *eventsReceiver) transformEvents(now pcommon.Timestamp, events []*mongodbatlas.Event, p *mongodbatlas.Project) plog.Logs {
func (er *eventsReceiver) transformProjectEvents(now pcommon.Timestamp, events []*mongodbatlas.Event, p *mongodbatlas.Project) plog.Logs {
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
ra := resourceLogs.Resource().Attributes()
ra.PutStr("mongodbatlas.project.name", p.Name)
ra.PutStr("mongodbatlas.org.id", p.OrgID)
er.transformEvents(now, events, &resourceLogs)
return logs
}

func (er *eventsReceiver) transformOrgEvents(now pcommon.Timestamp, events []*mongodbatlas.Event, o *mongodbatlas.Organization) plog.Logs {
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
ra := resourceLogs.Resource().Attributes()
ra.PutStr("mongodbatlas.org.id", o.ID)
er.transformEvents(now, events, &resourceLogs)
return logs
}

func (er *eventsReceiver) transformEvents(now pcommon.Timestamp, events []*mongodbatlas.Event, resourceLogs *plog.ResourceLogs) {
for _, event := range events {

logRecord := resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
Expand Down Expand Up @@ -216,7 +271,6 @@ func (er *eventsReceiver) transformEvents(now pcommon.Timestamp, events []*mongo

parseOptionalAttributes(&attrs, event)
}
return logs
}

func (er *eventsReceiver) checkpoint(ctx context.Context) error {
Expand Down
50 changes: 43 additions & 7 deletions receiver/mongodbatlasreceiver/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ func TestPoll(t *testing.T) {
Name: testProjectName,
},
},
Organizations: []*OrgConfig{
{
ID: testOrgID,
},
},
PollInterval: time.Second,
}

Expand All @@ -136,11 +141,17 @@ func TestPoll(t *testing.T) {
err = r.Shutdown(context.Background())
require.NoError(t, err)

expected, err := golden.ReadLogs(filepath.Join("testdata", "events", "golden", "events.yaml"))
expectedProjectLogs, err := golden.ReadLogs(filepath.Join("testdata", "events", "golden", "project-events.yaml"))
require.NoError(t, err)

expectedOrgLogs, err := golden.ReadLogs(filepath.Join("testdata", "events", "golden", "org-events.yaml"))
require.NoError(t, err)

logs := sink.AllLogs()[0]
require.NoError(t, plogtest.CompareLogs(expected, logs, plogtest.IgnoreObservedTimestamp()))
projectLogs := sink.AllLogs()[0]
require.NoError(t, plogtest.CompareLogs(expectedProjectLogs, projectLogs, plogtest.IgnoreObservedTimestamp()))

orgLogs := sink.AllLogs()[1]
require.NoError(t, plogtest.CompareLogs(expectedOrgLogs, orgLogs, plogtest.IgnoreObservedTimestamp()))
}

func TestProjectGetFailure(t *testing.T) {
Expand All @@ -151,13 +162,19 @@ func TestProjectGetFailure(t *testing.T) {
Name: "fake-project",
},
},
Organizations: []*OrgConfig{
{
ID: "fake-org",
},
},
PollInterval: time.Second,
}

sink := &consumertest.LogsSink{}
r := newEventsReceiver(receivertest.NewNopCreateSettings(), cfg, sink)
mClient := &mockEventsClient{}
mClient.On("GetProject", mock.Anything, "fake-project").Return(nil, fmt.Errorf("unable to get project: %d", http.StatusUnauthorized))
mClient.On("GetOrganization", mock.Anything, "fake-org").Return(nil, fmt.Errorf("unable to get org: %d", http.StatusUnauthorized))

err := r.Start(context.Background(), componenttest.NewNopHost(), storage.NewNopClient())
require.NoError(t, err)
Expand All @@ -176,7 +193,9 @@ type mockEventsClient struct {

func (mec *mockEventsClient) setupMock(t *testing.T) {
mec.setupGetProject()
mec.On("GetEvents", mock.Anything, mock.Anything, mock.Anything).Return(mec.loadTestEvents(t), false, nil)
mec.setupGetOrganization()
mec.On("GetProjectEvents", mock.Anything, mock.Anything, mock.Anything).Return(mec.loadTestEvents(t, "project-events.json"), false, nil)
mec.On("GetOrganizationEvents", mock.Anything, mock.Anything, mock.Anything).Return(mec.loadTestEvents(t, "org-events.json"), false, nil)
}

func (mec *mockEventsClient) setupGetProject() {
Expand All @@ -188,8 +207,15 @@ func (mec *mockEventsClient) setupGetProject() {
}, nil)
}

func (mec *mockEventsClient) loadTestEvents(t *testing.T) []*mongodbatlas.Event {
testEvents := filepath.Join("testdata", "events", "sample-payloads", "events.json")
func (mec *mockEventsClient) setupGetOrganization() {
mec.On("GetOrganization", mock.Anything, mock.Anything).Return(&mongodbatlas.Organization{
ID: testOrgID,
Links: []*mongodbatlas.Link{},
}, nil)
}

func (mec *mockEventsClient) loadTestEvents(t *testing.T, filename string) []*mongodbatlas.Event {
testEvents := filepath.Join("testdata", "events", "sample-payloads", filename)
eventBytes, err := os.ReadFile(testEvents)
require.NoError(t, err)

Expand All @@ -204,7 +230,17 @@ func (mec *mockEventsClient) GetProject(ctx context.Context, pID string) (*mongo
return args.Get(0).(*mongodbatlas.Project), args.Error(1)
}

func (mec *mockEventsClient) GetEvents(ctx context.Context, pID string, opts *internal.GetEventsOptions) ([]*mongodbatlas.Event, bool, error) {
func (mec *mockEventsClient) GetProjectEvents(ctx context.Context, pID string, opts *internal.GetEventsOptions) ([]*mongodbatlas.Event, bool, error) {
args := mec.Called(ctx, pID, opts)
return args.Get(0).([]*mongodbatlas.Event), args.Bool(1), args.Error(2)
}

func (mec *mockEventsClient) GetOrganization(ctx context.Context, oID string) (*mongodbatlas.Organization, error) {
args := mec.Called(ctx, oID)
return args.Get(0).(*mongodbatlas.Organization), args.Error(1)
}

func (mec *mockEventsClient) GetOrganizationEvents(ctx context.Context, oID string, opts *internal.GetEventsOptions) ([]*mongodbatlas.Event, bool, error) {
args := mec.Called(ctx, oID, opts)
return args.Get(0).([]*mongodbatlas.Event), args.Bool(1), args.Error(2)
}
28 changes: 26 additions & 2 deletions receiver/mongodbatlasreceiver/internal/mongodb_atlas_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,8 +663,8 @@ type GetEventsOptions struct {
MaxDate time.Time
}

// GetEvents returns the events specified for the set projects
func (s *MongoDBAtlasClient) GetEvents(ctx context.Context, groupID string, opts *GetEventsOptions) (ret []*mongodbatlas.Event, nextPage bool, err error) {
// GetProjectEvents returns the events specified for the set projects
func (s *MongoDBAtlasClient) GetProjectEvents(ctx context.Context, groupID string, opts *GetEventsOptions) (ret []*mongodbatlas.Event, nextPage bool, err error) {
lo := mongodbatlas.ListOptions{
PageNum: opts.PageNum,
ItemsPerPage: opts.PageSize,
Expand All @@ -687,6 +687,30 @@ func (s *MongoDBAtlasClient) GetEvents(ctx context.Context, groupID string, opts
return events.Results, hasNext(response.Links), nil
}

// GetOrgEvents returns the events specified for the set organizations
func (s *MongoDBAtlasClient) GetOrganizationEvents(ctx context.Context, orgID string, opts *GetEventsOptions) (ret []*mongodbatlas.Event, nextPage bool, err error) {
lo := mongodbatlas.ListOptions{
PageNum: opts.PageNum,
ItemsPerPage: opts.PageSize,
}
options := mongodbatlas.EventListOptions{
ListOptions: lo,
// Earliest Timestamp in ISO 8601 date and time format in UTC from when Atlas should return events.
MinDate: opts.MinDate.Format(time.RFC3339),
}

if len(opts.EventTypes) > 0 {
options.EventType = opts.EventTypes
}

events, response, err := s.client.Events.ListOrganizationEvents(ctx, orgID, &options)
err = checkMongoDBClientErr(err, response)
if err != nil {
return nil, false, err
}
return events.Results, hasNext(response.Links), nil
}

func toUnixString(t time.Time) string {
return strconv.Itoa(int(t.Unix()))
}
Loading

0 comments on commit 6658646

Please sign in to comment.