Skip to content

Commit

Permalink
[exporter/opensearch] Send logs to Opensearch (#26475)
Browse files Browse the repository at this point in the history
## Description: <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Implementation of exporter to OpenSearch using opensearch-go library. As
of now, this PR was heavily inspired by
https://github.com/dbason/opentelemetry-collector-contrib/tree/opensearch-exporter/exporter/opensearchexporter.

By default, requests sent adhere to the OpenSearch Catalog [schema for
logs](https://github.com/opensearch-project/opensearch-catalog/tree/main/schema/observability/logs),
but allows users to export using the Elastic Common Schema as well.

This PR also:
- enables users to define the `bulk_action` between `create` and `index`
- enables users to define the logs index without necessarily adhering to
the new [index naming
conventions](opensearch-project/observability#1405)
through the `LogsIndex` config.

## Tracking Issue: 


[23611](#23611)

## Testing: <Describe what testing was performed and which tests were
added.>

### Integration
- Successful round-trip to HTTP endpoint,
- Permanent error during round-trip,
- Retryable error response for first request, followed by successful
response on retry,
- Two retriable error responses, followed by successful response on
second retry.

### Manual
- Authentication using `configtls.TLSSetting` (`ca_file`, `cert_file`,
`key_file`)
- Tested in EKS and K3s clusters running
[opni](https://github.com/rancher/opni).

---------

Signed-off-by: João Henri <[email protected]>
Signed-off-by: João Henri <[email protected]>
  • Loading branch information
jaehnri authored Oct 27, 2023
1 parent 7772e02 commit e8b0e2a
Show file tree
Hide file tree
Showing 21 changed files with 1,998 additions and 123 deletions.
10 changes: 10 additions & 0 deletions .chloggen/add-logs-opensearch-exporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Use this changelog template to create an entry for release notes.

change_type: enhancement
component: opensearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add log exporting capability to the opensearchexporter.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23611]
5 changes: 4 additions & 1 deletion exporter/opensearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: traces |
| Stability | [development]: logs, traces |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aexporter%2Fopensearch%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aexporter%2Fopensearch) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aexporter%2Fopensearch%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aexporter%2Fopensearch) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@Aneurysm9](https://www.github.com/Aneurysm9), [@MitchellGale](https://www.github.com/MitchellGale), [@MaxKsyunz](https://www.github.com/MaxKsyunz), [@YANG-DB](https://www.github.com/YANG-DB) |
Expand Down Expand Up @@ -32,6 +32,9 @@ Supports standard TLS settings as part of HTTP settings. See [TLS Configuration/

### Timeout Options
- `timeout` : See [timeout](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)

### Bulk Indexer Options
- `bulk_action` (optional): the [action](https://opensearch.org/docs/2.9/api-reference/document-apis/bulk/) for ingesting data. Only `create` and `index` are allowed here.
## Example

```yaml
Expand Down
112 changes: 107 additions & 5 deletions exporter/opensearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package opensearchexporter // import "github.com/open-telemetry/opentelemetry-co

import (
"errors"
"strings"

"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/exporter/exporterhelper"
Expand All @@ -16,34 +17,135 @@ const (

// defaultDataset value is used as ssoTracesExporter.Dataset when component.Config.Dataset is not set.
defaultDataset = "default"

// defaultBulkAction value is used when component.Config.BulkAction is not set.
defaultBulkAction = "create"

// defaultMappingMode value is used when component.Config.MappingSettings.Mode is not set.
defaultMappingMode = "ss4o"
)

// Config defines configuration for OpenSearch exporter.
type Config struct {
confighttp.HTTPClientSettings `mapstructure:"http"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`
exporterhelper.TimeoutSettings `mapstructure:",squash"`
Namespace string `mapstructure:"namespace"`
Dataset string `mapstructure:"dataset"`
MappingsSettings `mapstructure:"mapping"`

// The Observability indices would follow the recommended for immutable data stream ingestion pattern using
// the data_stream concepts. See https://opensearch.org/docs/latest/dashboards/im-dashboards/datastream/
// Index pattern will follow the next naming template ss4o_{type}-{dataset}-{namespace}
Dataset string `mapstructure:"dataset"`
Namespace string `mapstructure:"namespace"`

// LogsIndex configures the index, index alias, or data stream name logs should be indexed in.
// https://opensearch.org/docs/latest/im-plugin/index/
// https://opensearch.org/docs/latest/dashboards/im-dashboards/datastream/
LogsIndex string `mapstructure:"logs_index"`

// BulkAction configures the action for ingesting data. Only `create` and `index` are allowed here.
// If not specified, the default value `create` will be used.
BulkAction string `mapstructure:"bulk_action"`
}

var (
errConfigNoEndpoint = errors.New("endpoint must be specified")
errDatasetNoValue = errors.New("dataset must be specified")
errNamespaceNoValue = errors.New("namespace must be specified")
errConfigNoEndpoint = errors.New("endpoint must be specified")
errDatasetNoValue = errors.New("dataset must be specified")
errNamespaceNoValue = errors.New("namespace must be specified")
errBulkActionInvalid = errors.New("bulk_action can either be `create` or `index`")
errMappingModeInvalid = errors.New("mapping.mode is invalid")
)

type MappingsSettings struct {
// Mode configures the field mappings.
// Supported modes are the following:
//
// ss4o: exports logs in the Simple Schema for Observability standard.
// This mode is enabled by default.
// See: https://opensearch.org/docs/latest/observing-your-data/ss4o/
//
// ecs: maps fields defined in the OpenTelemetry Semantic Conventions
// to the Elastic Common Schema.
// See: https://www.elastic.co/guide/en/ecs/current/index.html
//
// flatten_attributes: uses the ECS mapping but flattens all resource and
// log attributes in the record to the top-level.
Mode string `mapstructure:"mode"`

// Additional field mappings.
Fields map[string]string `mapstructure:"fields"`

// File to read additional fields mappings from.
File string `mapstructure:"file"`

// Field to store timestamp in. If not set uses the default @timestamp
TimestampField string `mapstructure:"timestamp_field"`

// Whether to store timestamp in Epoch miliseconds
UnixTimestamp bool `mapstructure:"unix_timestamp"`

// Try to find and remove duplicate fields
Dedup bool `mapstructure:"dedup"`

Dedot bool `mapstructure:"dedot"`
}

type MappingMode int

// Enum values for MappingMode.
const (
MappingSS4O MappingMode = iota
MappingECS
MappingFlattenAttributes
)

func (m MappingMode) String() string {
switch m {
case MappingSS4O:
return "ss4o"
case MappingECS:
return "ecs"
case MappingFlattenAttributes:
return "flatten_attributes"
default:
return "ss4o"
}
}

var mappingModes = func() map[string]MappingMode {
table := map[string]MappingMode{}
for _, m := range []MappingMode{
MappingECS,
MappingSS4O,
MappingFlattenAttributes,
} {
table[strings.ToLower(m.String())] = m
}

return table
}()

// Validate validates the opensearch server configuration.
func (cfg *Config) Validate() error {
var multiErr []error
if len(cfg.Endpoint) == 0 {
multiErr = append(multiErr, errConfigNoEndpoint)
}

if len(cfg.Dataset) == 0 {
multiErr = append(multiErr, errDatasetNoValue)
}
if len(cfg.Namespace) == 0 {
multiErr = append(multiErr, errNamespaceNoValue)
}

if cfg.BulkAction != "create" && cfg.BulkAction != "index" {
return errBulkActionInvalid
}

if _, ok := mappingModes[cfg.MappingsSettings.Mode]; !ok {
multiErr = append(multiErr, errMappingModeInvalid)
}

return errors.Join(multiErr...)
}
15 changes: 15 additions & 0 deletions exporter/opensearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestLoadConfig(t *testing.T) {
sampleEndpoint := "https://opensearch.example.com:9200"
sampleCfg := withDefaultConfig(func(config *Config) {
config.Endpoint = sampleEndpoint
config.BulkAction = defaultBulkAction
})
maxIdleConns := 100
idleConnTimeout := 90 * time.Second
Expand Down Expand Up @@ -66,6 +67,10 @@ func TestLoadConfig(t *testing.T) {
Multiplier: 1.5,
RandomizationFactor: 0.5,
},
BulkAction: defaultBulkAction,
MappingsSettings: MappingsSettings{
Mode: "ss4o",
},
},
configValidateAssert: assert.NoError,
},
Expand All @@ -91,6 +96,16 @@ func TestLoadConfig(t *testing.T) {
return assert.ErrorContains(t, err, errNamespaceNoValue.Error())
},
},
{
id: component.NewIDWithName(metadata.Type, "invalid_bulk_action"),
expected: withDefaultConfig(func(config *Config) {
config.Endpoint = sampleEndpoint
config.BulkAction = "delete"
}),
configValidateAssert: func(t assert.TestingT, err error, i ...interface{}) bool {
return assert.ErrorContains(t, err, errBulkActionInvalid.Error())
},
},
}

for _, tt := range tests {
Expand Down
Loading

0 comments on commit e8b0e2a

Please sign in to comment.