diff --git a/.chloggen/add_container_parser.yaml b/.chloggen/add_container_parser.yaml new file mode 100644 index 000000000000..b6b4406b8f43 --- /dev/null +++ b/.chloggen/add_container_parser.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: filelogreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add container operator parser + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31959] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/pkg/stanza/adapter/register.go b/pkg/stanza/adapter/register.go index 8105ef17d587..426e456decfa 100644 --- a/pkg/stanza/adapter/register.go +++ b/pkg/stanza/adapter/register.go @@ -6,6 +6,7 @@ package adapter // import "github.com/open-telemetry/opentelemetry-collector-con import ( _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/file" // Register parsers and transformers for stanza-based log receivers _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/output/stdout" + _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/container" _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/csv" _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/json" _ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/jsonarray" diff --git a/pkg/stanza/docs/operators/container.md b/pkg/stanza/docs/operators/container.md new file mode 100644 index 000000000000..4cc972fbc5ed --- /dev/null +++ b/pkg/stanza/docs/operators/container.md @@ -0,0 +1,238 @@ +## `container` operator + +The `container` operator parses logs in `docker`, `cri-o` and `containerd` formats. + +### Configuration Fields + +| Field | Default | Description | +|------------------------------|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `id` | `container` | A unique identifier for the operator. | +| `format` | `` | The container log format to use if it is known. Users can choose between `docker`, `crio` and `containerd`. If not set, the format will be automatically detected. | +| `add_metadata_from_filepath` | `true` | Set if k8s metadata should be added from the file path. Requires the `log.file.path` field to be present. | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries. | +| `parse_from` | `body` | The [field](../types/field.md) from which the value will be parsed. | +| `parse_to` | `attributes` | The [field](../types/field.md) to which the value will be parsed. | +| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](../types/on_error.md). | +| `if` | | An [expression](../types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. | +| `severity` | `nil` | An optional [severity](../types/severity.md) block which will parse a severity field before passing the entry to the output operator. | + + +### Embedded Operations + +The `container` parser can be configured to embed certain operations such as the severity parsing. For more information, see [complex parsers](../types/parsers.md#complex-parsers). + +### Add metadata from file path + +Requires `include_file_path: true` in order for the `log.file.path` field to be available for the operator. +If that's not possible, users can disable the metadata addition with `add_metadata_from_filepath: false`. +A file path like `"/var/log/pods/some-ns_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log"`, +will produce the following k8s metadata: + +```json +{ + "attributes": { + "k8s": { + "container": { + "name": "kube-controller", + "restart_count": "1" + }, "pod": { + "uid": "49cc7c1fd3702c40b2686ea7486091d6", + "name": "kube-controller-kind-control-plane" + }, "namespace": { + "name": "some-ns" + } + } + } +} +``` + +### Example Configurations: + +#### Parse the body as docker container log + +Configuration: +```yaml +- type: container + format: docker + add_metadata_from_filepath: true +``` + +Note: in this example the `format: docker` is optional since formats can be automatically detected as well. + `add_metadata_from_filepath` is true by default as well. + + + + + + + +
Input body Output body
+ +```json +{ + "timestamp": "", + "body": "{\"log\":\"INFO: log line here\",\"stream\":\"stdout\",\"time\":\"2029-03-30T08:31:20.545192187Z\"}", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +} +``` + + + +```json +{ + "timestamp": "2024-03-30 08:31:20.545192187 +0000 UTC", + "body": "log line here", + "attributes": { + "time": "2024-03-30T08:31:20.545192187Z", + "log.iostream": "stdout", + "k8s.pod.name": "kube-controller-kind-control-plane", + "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6", + "k8s.container.name": "kube-controller", + "k8s.container.restart_count": "1", + "k8s.namespace.name": "some", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" + } +} +``` + +
+ +#### Parse the body as cri-o container log + +Configuration: +```yaml +- type: container +``` + + + + + + + +
Input body Output body
+ +```json +{ + "timestamp": "", + "body": "2024-04-13T07:59:37.505201169-05:00 stdout F standalone crio line which is awesome", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +} +``` + + + +```json +{ + "timestamp": "2024-04-13 12:59:37.505201169 +0000 UTC", + "body": "standalone crio line which is awesome", + "attributes": { + "time": "2024-04-13T07:59:37.505201169-05:00", + "logtag": "F", + "log.iostream": "stdout", + "k8s.pod.name": "kube-controller-kind-control-plane", + "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6", + "k8s.container.name": "kube-controller", + "k8s.container.restart_count": "1", + "k8s.namespace.name": "some", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" + } +} +``` + +
+ +#### Parse the body as containerd container log + +Configuration: +```yaml +- type: container +``` + + + + + + + +
Input body Output body
+ +```json +{ + "timestamp": "", + "body": "2023-06-22T10:27:25.813799277Z stdout F standalone containerd line that is super awesome", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +} +``` + + + +```json +{ + "timestamp": "2023-06-22 10:27:25.813799277 +0000 UTC", + "body": "standalone containerd line that is super awesome", + "attributes": { + "time": "2023-06-22T10:27:25.813799277Z", + "logtag": "F", + "log.iostream": "stdout", + "k8s.pod.name": "kube-controller-kind-control-plane", + "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6", + "k8s.container.name": "kube-controller", + "k8s.container.restart_count": "1", + "k8s.namespace.name": "some", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" + } +} +``` + +
+ +#### Parse the multiline as containerd container log and recombine into a single one + +Configuration: +```yaml +- type: container +``` + + + + + + + +
Input body Output body
+ +```json +{ + "timestamp": "", + "body": "2023-06-22T10:27:25.813799277Z stdout P multiline containerd line that i", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +}, +{ + "timestamp": "", + "body": "2023-06-22T10:27:25.813799277Z stdout F s super awesomne", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" +} +``` + + + +```json +{ + "timestamp": "2023-06-22 10:27:25.813799277 +0000 UTC", + "body": "multiline containerd line that is super awesome", + "attributes": { + "time": "2023-06-22T10:27:25.813799277Z", + "logtag": "F", + "log.iostream": "stdout", + "k8s.pod.name": "kube-controller-kind-control-plane", + "k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d6", + "k8s.container.name": "kube-controller", + "k8s.container.restart_count": "1", + "k8s.namespace.name": "some", + "log.file.path": "/var/log/pods/some_kube-controller-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d6/kube-controller/1.log" + } +} +``` + +
\ No newline at end of file diff --git a/pkg/stanza/operator/helper/regexp.go b/pkg/stanza/operator/helper/regexp.go new file mode 100644 index 000000000000..7306926ced79 --- /dev/null +++ b/pkg/stanza/operator/helper/regexp.go @@ -0,0 +1,28 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package helper // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + +import ( + "fmt" + "regexp" +) + +func MatchValues(value string, regexp *regexp.Regexp) (map[string]any, error) { + matches := regexp.FindStringSubmatch(value) + if matches == nil { + return nil, fmt.Errorf("regex pattern does not match") + } + + parsedValues := map[string]any{} + for i, subexp := range regexp.SubexpNames() { + if i == 0 { + // Skip whole match + continue + } + if subexp != "" { + parsedValues[subexp] = matches[i] + } + } + return parsedValues, nil +} diff --git a/pkg/stanza/operator/parser/container/config.go b/pkg/stanza/operator/parser/container/config.go new file mode 100644 index 000000000000..fb6555708182 --- /dev/null +++ b/pkg/stanza/operator/parser/container/config.go @@ -0,0 +1,120 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package container // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/container" + +import ( + "fmt" + "sync" + + jsoniter "github.com/json-iterator/go" + "go.opentelemetry.io/collector/component" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/errors" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/recombine" +) + +const operatorType = "container" + +func init() { + operator.Register(operatorType, func() operator.Builder { return NewConfig() }) +} + +// NewConfig creates a new JSON parser config with default values +func NewConfig() *Config { + return NewConfigWithID(operatorType) +} + +// NewConfigWithID creates a new JSON parser config with default values +func NewConfigWithID(operatorID string) *Config { + return &Config{ + ParserConfig: helper.NewParserConfig(operatorID, operatorType), + Format: "", + AddMetadataFromFilePath: true, + } +} + +// Config is the configuration of a Container parser operator. +type Config struct { + helper.ParserConfig `mapstructure:",squash"` + + Format string `mapstructure:"format"` + AddMetadataFromFilePath bool `mapstructure:"add_metadata_from_filepath"` +} + +// Build will build a Container parser operator. +func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error) { + parserOperator, err := c.ParserConfig.Build(set) + if err != nil { + return nil, err + } + + cLogEmitter := helper.NewLogEmitter(set.Logger.Sugar()) + recombineParser, err := createRecombine(set, cLogEmitter) + if err != nil { + return nil, fmt.Errorf("failed to create internal recombine config: %w", err) + } + + wg := sync.WaitGroup{} + + if c.Format != "" { + switch c.Format { + case dockerFormat, crioFormat, containerdFormat: + default: + return &Parser{}, errors.NewError( + "operator config has an invalid `format` field.", + "ensure that the `format` field is set to one of `docker`, `crio`, `containerd`.", + "format", c.OnError, + ) + } + } + + p := &Parser{ + ParserOperator: parserOperator, + recombineParser: recombineParser, + json: jsoniter.ConfigFastest, + format: c.Format, + addMetadataFromFilepath: c.AddMetadataFromFilePath, + crioLogEmitter: cLogEmitter, + criConsumers: &wg, + } + return p, nil +} + +// createRecombine creates an internal recombine operator which outputs to an async helper.LogEmitter +// the equivalent recombine config: +// +// combine_field: body +// combine_with: "" +// is_last_entry: attributes.logtag == 'F' +// max_log_size: 102400 +// source_identifier: attributes["log.file.path"] +// type: recombine +func createRecombine(set component.TelemetrySettings, cLogEmitter *helper.LogEmitter) (operator.Operator, error) { + recombineParserCfg := createRecombineConfig() + recombineParser, err := recombineParserCfg.Build(set) + if err != nil { + return nil, fmt.Errorf("failed to resolve internal recombine config: %w", err) + } + + // set the LogEmmiter as the output of the recombine parser + recombineParser.SetOutputIDs([]string{cLogEmitter.OperatorID}) + if err := recombineParser.SetOutputs([]operator.Operator{cLogEmitter}); err != nil { + return nil, fmt.Errorf("failed to set outputs of internal recombine") + } + + return recombineParser, nil +} + +func createRecombineConfig() *recombine.Config { + recombineParserCfg := recombine.NewConfigWithID(recombineInternalID) + recombineParserCfg.IsLastEntry = "attributes.logtag == 'F'" + recombineParserCfg.CombineField = entry.NewBodyField() + recombineParserCfg.CombineWith = "" + recombineParserCfg.SourceIdentifier = entry.NewAttributeField("log.file.path") + recombineParserCfg.MaxLogSize = 102400 + return recombineParserCfg +} diff --git a/pkg/stanza/operator/parser/container/config_test.go b/pkg/stanza/operator/parser/container/config_test.go new file mode 100644 index 000000000000..599c26c1b7fd --- /dev/null +++ b/pkg/stanza/operator/parser/container/config_test.go @@ -0,0 +1,107 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package container + +import ( + "path/filepath" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" +) + +func TestConfig(t *testing.T) { + operatortest.ConfigUnmarshalTests{ + DefaultConfig: NewConfig(), + TestsFile: filepath.Join(".", "testdata", "config.yaml"), + Tests: []operatortest.ConfigUnmarshalTest{ + { + Name: "default", + Expect: NewConfig(), + }, + { + Name: "parse_from_simple", + Expect: func() *Config { + cfg := NewConfig() + cfg.ParseFrom = entry.NewBodyField("from") + return cfg + }(), + }, + { + Name: "parse_to_simple", + Expect: func() *Config { + cfg := NewConfig() + cfg.ParseTo = entry.RootableField{Field: entry.NewBodyField("log")} + return cfg + }(), + }, + { + Name: "on_error_drop", + Expect: func() *Config { + cfg := NewConfig() + cfg.OnError = "drop" + return cfg + }(), + }, + { + Name: "severity", + Expect: func() *Config { + cfg := NewConfig() + parseField := entry.NewBodyField("severity_field") + severityField := helper.NewSeverityConfig() + severityField.ParseFrom = &parseField + mapping := map[string]any{ + "critical": "5xx", + "error": "4xx", + "info": "3xx", + "debug": "2xx", + } + severityField.Mapping = mapping + cfg.SeverityConfig = &severityField + return cfg + }(), + }, + { + Name: "format", + Expect: func() *Config { + cfg := NewConfig() + cfg.Format = "docker" + return cfg + }(), + }, + { + Name: "add_metadata_from_file_path", + Expect: func() *Config { + cfg := NewConfig() + cfg.AddMetadataFromFilePath = true + return cfg + }(), + }, + { + Name: "parse_to_attributes", + Expect: func() *Config { + p := NewConfig() + p.ParseTo = entry.RootableField{Field: entry.NewAttributeField()} + return p + }(), + }, + { + Name: "parse_to_body", + Expect: func() *Config { + p := NewConfig() + p.ParseTo = entry.RootableField{Field: entry.NewBodyField()} + return p + }(), + }, + { + Name: "parse_to_resource", + Expect: func() *Config { + p := NewConfig() + p.ParseTo = entry.RootableField{Field: entry.NewResourceField()} + return p + }(), + }, + }, + }.Run(t) +} diff --git a/pkg/stanza/operator/parser/container/package_test.go b/pkg/stanza/operator/parser/container/package_test.go new file mode 100644 index 000000000000..245776eec13d --- /dev/null +++ b/pkg/stanza/operator/parser/container/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package container + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/pkg/stanza/operator/parser/container/parser.go b/pkg/stanza/operator/parser/container/parser.go new file mode 100644 index 000000000000..d531925e9735 --- /dev/null +++ b/pkg/stanza/operator/parser/container/parser.go @@ -0,0 +1,357 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package container // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/container" + +import ( + "context" + "errors" + "fmt" + "regexp" + "strings" + "sync" + "time" + + jsoniter "github.com/json-iterator/go" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/timeutils" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" +) + +const dockerFormat = "docker" +const crioFormat = "crio" +const containerdFormat = "containerd" +const recombineInternalID = "recombine_container_internal" +const dockerPattern = "^\\{" +const crioPattern = "^(?P