Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/awscloudwatchlogsexporter] Improve performance of the awscloudwatchlogs exporter #26692

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
406ce61
Chore: Rename PusherKey to StreamKey
rapphil Sep 13, 2023
7ee35f6
Chore: Move StreamKey inside event
rapphil Sep 13, 2023
1e55b06
Chore: Refactor to properly initialize the event
rapphil Sep 13, 2023
7d4c305
Feat: Add cwlogs Pusher able to send events to multiple streams
rapphil Sep 13, 2023
10b4638
Feat: use MultiStream pusher in awscloudwatchlogsexporter
rapphil Sep 14, 2023
8b3e0f4
Feat: Add support to consumers in cloudwatchlogs
rapphil Sep 13, 2023
1d9e0e2
Feat: Remove locks from cloudwatch logs pusher
rapphil Sep 14, 2023
eb38388
Feat: Remove throttling limiter
rapphil Sep 13, 2023
069f471
Feat: Stop using sequence token
rapphil Sep 14, 2023
51ddd00
Feat: Send log events directly to the Pusher
rapphil Sep 14, 2023
034b8ca
chore: Update readme
rapphil Sep 14, 2023
fb01288
Chore: update changelog
rapphil Sep 14, 2023
d189d48
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Sep 14, 2023
caf1be6
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Sep 14, 2023
0b4dbea
Chore: fix wrong dependency
rapphil Sep 14, 2023
86e2c38
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Sep 18, 2023
bd7d4a1
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Sep 21, 2023
c9f98b1
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Sep 21, 2023
e6383ab
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Oct 3, 2023
ccd669c
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Oct 6, 2023
8f183e2
Update internal/aws/cwlogs/pusher.go
rapphil Oct 6, 2023
fb93d1b
Use queue validation from exporterhelper
rapphil Oct 6, 2023
698e581
Simplify error handling and avoid uncessary code
rapphil Oct 9, 2023
0cc848c
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Oct 9, 2023
eb608f6
Chore: Refactor pusher and cwlog_client to not use stream token
rapphil Oct 9, 2023
0489473
Chore: minor refactoring
rapphil Oct 9, 2023
f20ffae
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Oct 9, 2023
dec0f2d
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Oct 10, 2023
c799e53
Merge branch 'main' into rapphil-parallel-awscloudwatchlogsexporter
rapphil Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
params,
config,
logsPusher.consumeLogs,
exporterhelper.WithQueue(expConfig.QueueSettings),

Check warning on line 95 in exporter/awscloudwatchlogsexporter/exporter.go

View check run for this annotation

Codecov / codecov/patch

exporter/awscloudwatchlogsexporter/exporter.go#L95

Added line #L95 was not covered by tests
exporterhelper.WithRetry(expConfig.RetrySettings),
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithShutdown(logsPusher.shutdown),
Expand All @@ -101,25 +101,21 @@

func (e *cwlExporter) consumeLogs(_ context.Context, ld plog.Logs) error {
pusher := e.pusherFactory.CreateMultiStreamPusher()
var errs []error
var errs error

err := pushLogsToCWLogs(e.logger, ld, e.Config, pusher)

if err != nil {
errs = append(errs, fmt.Errorf("Error pushing logs: %w", err))
errs = errors.Join(errs, fmt.Errorf("Error pushing logs: %w", err))
}

err = pusher.ForceFlush()

if err != nil {
errs = append(errs, fmt.Errorf("Error flushing logs: %w", err))
errs = errors.Join(errs, fmt.Errorf("Error flushing logs: %w", err))
}

if len(errs) != 0 {
return errors.Join(errs...)
}

return nil
return errs
bryan-aguilar marked this conversation as resolved.
Show resolved Hide resolved
}

func (e *cwlExporter) shutdown(_ context.Context) error {
Expand All @@ -130,10 +126,10 @@
n := ld.ResourceLogs().Len()

if n == 0 {
return nil

Check warning on line 129 in exporter/awscloudwatchlogsexporter/exporter.go

View check run for this annotation

Codecov / codecov/patch

exporter/awscloudwatchlogsexporter/exporter.go#L129

Added line #L129 was not covered by tests
}

var errs []error
var errs error

rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
Expand All @@ -152,18 +148,14 @@
} else {
err := pusher.AddLogEntry(event)
if err != nil {
errs = append(errs, err)
errs = errors.Join(errs, err)
}
}
}
}
}

if len(errs) != 0 {
return errors.Join(errs...)
}

return nil
return errs
}

type cwLogBody struct {
Expand Down
5 changes: 2 additions & 3 deletions internal/aws/cwlogs/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,16 +328,15 @@

func (m *multiStreamPusher) AddLogEntry(event *Event) error {
if err := m.logStreamManager.InitStream(event.StreamKey); err != nil {
return err
}

Check warning on line 332 in internal/aws/cwlogs/pusher.go

View check run for this annotation

Codecov / codecov/patch

internal/aws/cwlogs/pusher.go#L331-L332

Added lines #L331 - L332 were not covered by tests

var pusher Pusher
var ok bool

if _, ok := m.pusherMap[event.StreamKey]; !ok {
if pusher, ok = m.pusherMap[event.StreamKey]; !ok {
pusher = NewPusher(event.StreamKey, 1, m.client, m.logger)
m.pusherMap[event.StreamKey] = pusher
} else {
pusher = m.pusherMap[event.StreamKey]
}

return pusher.AddLogEntry(event)
Expand All @@ -349,13 +348,13 @@
for _, val := range m.pusherMap {
err := val.ForceFlush()
if err != nil {
errs = append(errs, err)
}

Check warning on line 352 in internal/aws/cwlogs/pusher.go

View check run for this annotation

Codecov / codecov/patch

internal/aws/cwlogs/pusher.go#L351-L352

Added lines #L351 - L352 were not covered by tests
}

if len(errs) != 0 {
return errors.Join(errs...)
}

Check warning on line 357 in internal/aws/cwlogs/pusher.go

View check run for this annotation

Codecov / codecov/patch

internal/aws/cwlogs/pusher.go#L356-L357

Added lines #L356 - L357 were not covered by tests

return nil
}
Expand Down
Loading