Skip to content

Commit

Permalink
[awss3exporter]: add Sumo Logic marshaler for logs in Installed Colle…
Browse files Browse the repository at this point in the history
…ctor format (#23649)

**Description:** This PR adds a new marshaller for `awss3` exporter. It
exports logs in the format of Sumo Logic Installed Collector. Metrics
and traces are not supported - creating an exporter for them will result
in an error.
Currently, nested typed (eg. map inside a map) might not be supported
correctly - I have to confirm the IC's behavior with them, but I wanted
to create the PR so that it can be reviewed early.

**Link to tracking Issue:** #23212

**Testing:** Unit tests and manual e2e tests. Some automatic e2e tests
will come later, but they will not be part of this repo, they will be a
test for integrating the ingest with Sumo Logic's backend.

**Documentation:** Readme updated.

---------

Signed-off-by: Katarzyna Kujawa <[email protected]>
Co-authored-by: Katarzyna Kujawa <[email protected]>
  • Loading branch information
aboguszewski-sumo and kasia-kujawa authored Aug 7, 2023
1 parent ce454b5 commit 74cffb5
Show file tree
Hide file tree
Showing 11 changed files with 465 additions and 1 deletion.
20 changes: 20 additions & 0 deletions .chloggen/awss3-sumologic-ic-marshaller.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awss3exporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: add Sumo Logic Installed Collector marshaler

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23212]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
10 changes: 9 additions & 1 deletion exporter/awss3exporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,17 @@ The following exporter configuration parameters are supported.
| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | |
| `s3_partition` | time granularity of S3 key: hour or minute | "minute" |
| `file_prefix` | file prefix defined by user | |
| `marshaler` | marshaler used to produce output data otlp_json | |
| `marshaler` | marshaler used to produce output data | `otlp_json` |
| `endpoint` | overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | |

### Marshaler

Marshaler determines the format of data sent to AWS S3. Currently, the following marshalers are implemented:

- `otlp_json` (default): the [OpenTelemetry Protocol format](https://github.com/open-telemetry/opentelemetry-proto), represented as json.
- `sumo_ic`: the [Sumo Logic Installed Collector Archive format](https://help.sumologic.com/docs/manage/data-archiving/archive/).
**This format is supported only for logs.**

# Example Configuration

Following example configuration defines to store output in 'eu-central' region and bucket named 'databucket'.
Expand Down
1 change: 1 addition & 0 deletions exporter/awss3exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type MarshalerType string

const (
OtlpJSON MarshalerType = "otlp_json"
SumoIC MarshalerType = "sumo_ic"
)

// Config contains the main configuration options for the s3 exporter
Expand Down
26 changes: 26 additions & 0 deletions exporter/awss3exporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,29 @@ func TestConfig_Validate(t *testing.T) {
})
}
}

func TestMarshallerName(t *testing.T) {
factories, err := otelcoltest.NopFactories()
assert.Nil(t, err)

factory := NewFactory()
factories.Exporters[factory.Type()] = factory
cfg, err := otelcoltest.LoadConfigAndValidate(
filepath.Join("testdata", "marshaler.yaml"), factories)

require.NoError(t, err)
require.NotNil(t, cfg)

e := cfg.Exporters[component.NewID("awss3")].(*Config)

assert.Equal(t, e,
&Config{
S3Uploader: S3UploaderConfig{
Region: "us-east-1",
S3Bucket: "foo",
S3Partition: "minute",
},
MarshalerName: "sumo_ic",
},
)
}
9 changes: 9 additions & 0 deletions exporter/awss3exporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package awss3exporter // import "github.com/open-telemetry/opentelemetry-collect

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
Expand Down Expand Up @@ -57,6 +58,10 @@ func createMetricsExporter(ctx context.Context,
return nil, err
}

if config.(*Config).MarshalerName == SumoIC {
return nil, fmt.Errorf("metrics are not supported by sumo_ic output format")
}

return exporterhelper.NewMetricsExporter(ctx, params,
config,
s3Exporter.ConsumeMetrics)
Expand All @@ -71,6 +76,10 @@ func createTracesExporter(ctx context.Context,
return nil, err
}

if config.(*Config).MarshalerName == SumoIC {
return nil, fmt.Errorf("traces are not supported by sumo_ic output format")
}

return exporterhelper.NewTracesExporter(ctx,
params,
config,
Expand Down
18 changes: 18 additions & 0 deletions exporter/awss3exporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,21 @@ func TestCreateLogsExporter(t *testing.T) {
assert.NoError(t, err)
require.NotNil(t, exp)
}

func TestUnsupportedMarshalerOptions(t *testing.T) {
cfg := createDefaultConfig()
cfg.(*Config).MarshalerName = SumoIC
exp, err := createMetricsExporter(
context.Background(),
exportertest.NewNopCreateSettings(),
cfg)
assert.Error(t, err)
require.Nil(t, exp)

exp2, err := createTracesExporter(
context.Background(),
exportertest.NewNopCreateSettings(),
cfg)
assert.Error(t, err)
require.Nil(t, exp2)
}
4 changes: 4 additions & 0 deletions exporter/awss3exporter/marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func NewMarshaler(mType MarshalerType, logger *zap.Logger) (marshaler, error) {
marshaler.tracesMarshaler = &ptrace.JSONMarshaler{}
marshaler.metricsMarshaler = &pmetric.JSONMarshaler{}
marshaler.fileFormat = "json"
case SumoIC:
sumomarshaler := newSumoICMarshaler()
marshaler.logsMarshaler = &sumomarshaler
marshaler.fileFormat = "json.gz"
default:
return nil, ErrUnknownMarshaler
}
Expand Down
6 changes: 6 additions & 0 deletions exporter/awss3exporter/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ func TestMarshaler(t *testing.T) {
require.NotNil(t, m)
assert.Equal(t, m.format(), "json")
}
{
m, err := NewMarshaler("sumo_ic", zap.NewNop())
assert.NoError(t, err)
require.NotNil(t, m)
assert.Equal(t, m.format(), "json.gz")
}
{
m, err := NewMarshaler("unknown", zap.NewNop())
assert.Error(t, err)
Expand Down
162 changes: 162 additions & 0 deletions exporter/awss3exporter/sumo_marshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package awss3exporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter"

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
)

const (
logBodyKey = "log"
)

type sumoMarshaler struct{}

func (*sumoMarshaler) format() string {
return string(SumoIC)
}

func newSumoICMarshaler() sumoMarshaler {
return sumoMarshaler{}
}

func logEntry(buf *bytes.Buffer, format string, a ...interface{}) {
buf.WriteString(fmt.Sprintf(format, a...))
buf.WriteString("\n")
}

func attributeValueToString(v pcommon.Value) (string, error) {
switch v.Type() {
case pcommon.ValueTypeStr:
return v.Str(), nil
case pcommon.ValueTypeBool:
return strconv.FormatBool(v.Bool()), nil
case pcommon.ValueTypeBytes:
return valueToJSON(v.Bytes().AsRaw())
case pcommon.ValueTypeDouble:
return strconv.FormatFloat(v.Double(), 'f', -1, 64), nil
case pcommon.ValueTypeInt:
return strconv.FormatInt(v.Int(), 10), nil
case pcommon.ValueTypeSlice:
return valueToJSON(v.Slice().AsRaw())
case pcommon.ValueTypeMap:
return valueToJSON(v.Map().AsRaw())
case pcommon.ValueTypeEmpty:
return "", nil
default:
return "", fmt.Errorf("unknown OpenTelemetry attribute value type: %q", v.Type())
}
}

func valueToJSON(m any) (string, error) {
jsonString := new(bytes.Buffer)
enc := json.NewEncoder(jsonString)
err := enc.Encode(m)

return strings.Trim(jsonString.String(), "\n"), err
}

const (
SourceCategoryKey = "_sourceCategory"
SourceHostKey = "_sourceHost"
SourceNameKey = "_sourceName"
)

func (sumoMarshaler) MarshalLogs(ld plog.Logs) ([]byte, error) {
buf := bytes.Buffer{}
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rl := rls.At(i)
ra := rl.Resource().Attributes()
sourceCategory, exists := ra.Get(SourceCategoryKey)
if !exists {
return nil, errors.New("_sourceCategory attribute does not exist")
}
sourceHost, exists := ra.Get(SourceHostKey)
if !exists {
return nil, errors.New("_sourceHost attribute does not exist")
}
sourceName, exists := ra.Get(SourceNameKey)
if !exists {
return nil, errors.New("_sourceName attribute does not exist")
}

sc, err := attributeValueToString(sourceCategory)
if err != nil {
return nil, err
}
sh, err := attributeValueToString(sourceHost)
if err != nil {
return nil, err
}
sn, err := attributeValueToString(sourceName)
if err != nil {
return nil, err
}
sc = strconv.Quote(sc)
sh = strconv.Quote(sh)
sn = strconv.Quote(sn)

// Remove the source attributes so that they won't be included in "fields" value.
ra.Remove(SourceCategoryKey)
ra.Remove(SourceHostKey)
ra.Remove(SourceNameKey)

fields, err := valueToJSON(ra.AsRaw())
if err != nil {
return nil, err
}

ills := rl.ScopeLogs()
for j := 0; j < ills.Len(); j++ {
ils := ills.At(j)
logs := ils.LogRecords()
for k := 0; k < logs.Len(); k++ {
lr := logs.At(k)
dateVal := lr.ObservedTimestamp()

message, err := getMessageJSON(lr)
if err != nil {
return nil, err
}

logEntry(&buf, "{\"date\": \"%s\",\"sourceName\":%s,\"sourceHost\":%s,\"sourceCategory\":%s,\"fields\":%s,\"message\":%s}",
dateVal, sn, sh, sc, fields, message)
}
}
}
return buf.Bytes(), nil
}

func getMessageJSON(lr plog.LogRecord) (string, error) {
// The "message" fields is a JSON created from combining the actual log body and log-level attributes,
// where the log body is stored under "log" key.
// More info:
// https://help.sumologic.com/docs/send-data/opentelemetry-collector/data-source-configurations/additional-configurations-reference/#mapping-opentelemetry-concepts-to-sumo-logic
message := new(bytes.Buffer)
enc := json.NewEncoder(message)

lr.Body().CopyTo(lr.Attributes().PutEmpty(logBodyKey))
err := enc.Encode(lr.Attributes().AsRaw())

return strings.Trim(message.String(), "\n"), err
}

func (s sumoMarshaler) MarshalTraces(_ ptrace.Traces) ([]byte, error) {
return nil, fmt.Errorf("traces can't be marshaled into %s format", s.format())
}

func (s sumoMarshaler) MarshalMetrics(_ pmetric.Metrics) ([]byte, error) {
return nil, fmt.Errorf("metrics can't be marshaled into %s format", s.format())
}
Loading

0 comments on commit 74cffb5

Please sign in to comment.