Skip to content

Commit

Permalink
Codesmon/exporter/azuremonitor/persistent queue (#26258)
Browse files Browse the repository at this point in the history
Description:
Added a new config item to support the QueueSettings values.
Extended the exportHelper.New[Metrics|Logs|Traces]Exporter call to pass
in the QueueSettings config, thus enabling persistent_queue for this
exporter.

Link to tracking Issue:
Fixes issue
#25859

Testing:
Extending unit tests to check configuration changes are picked up.

Documentation:
Added sending_queue config items to README.md's configuration section.
  • Loading branch information
codesmon authored Nov 8, 2023
1 parent 902b1a9 commit c26444b
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 9 deletions.
27 changes: 27 additions & 0 deletions .chloggen/codesmon_exporter_azuremonitor_persistent_queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: azuremonitorexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Extended Azure Monitor exporter to support persistent queue. Default is for QueueSettings.Enabled to be false.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [25859]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
5 changes: 5 additions & 0 deletions exporter/azuremonitorexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ The following settings can be optionally configured:
- `maxbatchsize` (default = 1024): The maximum number of telemetry items that can be submitted in each request. If this many items are buffered, the buffer will be flushed before `maxbatchinterval` expires.
- `maxbatchinterval` (default = 10s): The maximum time to wait before sending a batch of telemetry.
- `spaneventsenabled` (default = false): Enables export of span events.
- `sending_queue`
- `enabled` (default = false)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
- `queue_size` (default = 1000): Maximum number of batches kept in memory before data; ignored if `enabled` is `false`
- `storage` (default = `none`): When set, enables persistence and uses the component specified as a storage extension for the persistent queue

Example:

Expand Down
14 changes: 8 additions & 6 deletions exporter/azuremonitorexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import (
"time"

"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Config defines configuration for Azure Monitor
type Config struct {
Endpoint string `mapstructure:"endpoint"`
ConnectionString configopaque.String `mapstructure:"connection_string"`
InstrumentationKey configopaque.String `mapstructure:"instrumentation_key"`
MaxBatchSize int `mapstructure:"maxbatchsize"`
MaxBatchInterval time.Duration `mapstructure:"maxbatchinterval"`
SpanEventsEnabled bool `mapstructure:"spaneventsenabled"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
Endpoint string `mapstructure:"endpoint"`
ConnectionString configopaque.String `mapstructure:"connection_string"`
InstrumentationKey configopaque.String `mapstructure:"instrumentation_key"`
MaxBatchSize int `mapstructure:"maxbatchsize"`
MaxBatchInterval time.Duration `mapstructure:"maxbatchinterval"`
SpanEventsEnabled bool `mapstructure:"spaneventsenabled"`
}
9 changes: 9 additions & 0 deletions exporter/azuremonitorexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuremonitorexporter/internal/metadata"
)
Expand All @@ -22,6 +23,8 @@ func TestLoadConfig(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

disk := component.NewIDWithName("disk", "")

tests := []struct {
id component.ID
expected component.Config
Expand All @@ -40,6 +43,12 @@ func TestLoadConfig(t *testing.T) {
MaxBatchSize: 100,
MaxBatchInterval: 10 * time.Second,
SpanEventsEnabled: false,
QueueSettings: exporterhelper.QueueSettings{
QueueSize: 1000,
Enabled: true,
NumConsumers: 10,
StorageID: &disk,
},
},
},
}
Expand Down
2 changes: 2 additions & 0 deletions exporter/azuremonitorexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/azuremonitorexporter/internal/metadata"
Expand Down Expand Up @@ -49,6 +50,7 @@ func createDefaultConfig() component.Config {
MaxBatchSize: 1024,
MaxBatchInterval: 10 * time.Second,
SpanEventsEnabled: false,
QueueSettings: exporterhelper.NewDefaultQueueSettings(),
}
}

Expand Down
8 changes: 7 additions & 1 deletion exporter/azuremonitorexporter/logexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,11 @@ func newLogsExporter(config *Config, transportChannel transportChannel, set expo
logger: set.Logger,
}

return exporterhelper.NewLogsExporter(context.TODO(), set, config, exporter.onLogData)
return exporterhelper.NewLogsExporter(
context.TODO(),
set,
config,
exporter.onLogData,
exporterhelper.WithQueue(config.QueueSettings),
)
}
7 changes: 6 additions & 1 deletion exporter/azuremonitorexporter/metricexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,10 @@ func newMetricsExporter(config *Config, transportChannel transportChannel, set e
packer: newMetricPacker(set.Logger),
}

return exporterhelper.NewMetricsExporter(context.TODO(), set, config, exporter.onMetricData)
return exporterhelper.NewMetricsExporter(
context.TODO(),
set,
config,
exporter.onMetricData,
exporterhelper.WithQueue(config.QueueSettings))
}
9 changes: 9 additions & 0 deletions exporter/azuremonitorexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,12 @@ azuremonitor/2:
maxbatchsize: 100
# maxbatchinterval is the maximum time to wait before calling the configured endpoint.
maxbatchinterval: 10s

sending_queue:
# queue_size is the maximum number of items that can be queued before dropping data
queue_size: 1000
enabled: true
num_consumers: 10
storage: disk

disk/3:
7 changes: 6 additions & 1 deletion exporter/azuremonitorexporter/traceexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,10 @@ func newTracesExporter(config *Config, transportChannel transportChannel, set ex
logger: set.Logger,
}

return exporterhelper.NewTracesExporter(context.TODO(), set, config, exporter.onTraceData)
return exporterhelper.NewTracesExporter(
context.TODO(),
set,
config,
exporter.onTraceData,
exporterhelper.WithQueue(config.QueueSettings))
}

0 comments on commit c26444b

Please sign in to comment.