Skip to content

Commit

Permalink
feat: Add Chronicle Forwarder (#1382)
Browse files Browse the repository at this point in the history
* Add Chronicle Forwarder

* lint

* gosec

* PR feddback

* Break down functions

* Remove extra fields from docs

* Remove writer from struct

* Update readme and validate functions
  • Loading branch information
Miguel Rodriguez authored Jan 4, 2024
1 parent 1d78771 commit d409ef4
Show file tree
Hide file tree
Showing 17 changed files with 1,274 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/exporters.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Below is a list of supported exporters with links to their documentation pages.
| Azure Monitor Exporter | [azuremonitorexporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.91.0/exporter/azuremonitorexporter/README.md) |
| Carbon Exporter | [carbonexporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.91.0/exporter/carbonexporter/README.md) |
| Chronicle Exporter | [chronicleexporter](../exporter/chronicleexporter/README.md) |
| Chronicle Forwarder Exporter | [chronicleexporter](../exporter/chronicleforwarderexporter/README.md) |
| ClickHouse Exporter | [clickhouseexporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.91.0/exporter/clickhouseexporter/README.md) |
| Coralogix Exporter | [coralogixexporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.91.0/exporter/coralogixexporter/README.md) |
| Datadog Exporter | [datadogexporter](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.91.0/exporter/datadogexporter/README.md) |
Expand Down
52 changes: 52 additions & 0 deletions exporter/chronicleforwarderexporter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Chronicle Forwarder Exporter

The Chronicle Forwarder Exporter is designed for forwarding logs to a Chronicle Forwarder endpoint using either Syslog or File-based methods. This exporter supports customization of data export types and various configuration options to tailor the connection and data handling to specific needs.

## Minimum Agent Versions

- Introduced: [v1.42.0](https://github.com/observIQ/bindplane-agent/releases/tag/v1.42.0)

## Supported Pipelines

- Logs

## How It Works

1. For Syslog, it establishes a network connection to the specified Chronicle forwarder endpoint.
2. For File, it writes logs to a specified file path.

## Configuration

| Field | Type | Default Value | Required | Description |
| -------------------- | ------ | ----------------- | -------- | ------------------------------------------------- |
| export_type | string | `syslog` | `true` | Type of export, either `syslog` or `file`. |
| raw_log_field | string | | `false` | The field name to send raw logs to Chronicle. |
| syslog.endpoint | string | `127.0.0.1:10514` | `false` | The Chronicle forwarder endpoint. |
| syslog.network | string | `tcp` | `false` | The network protocol to use (e.g., `tcp`, `udp`). |
| syslog.tls.key_file | string | | `false` | Configure the receiver to use TLS. |
| syslog.tls.cert_file | string | | `false` | Configure the receiver to use TLS. |
| file.path | string | | `false` | The path to the file for storing logs. |

## Example Configurations

### Syslog Configuration Example

```yaml
chronicleforwarder:
export_type: "syslog"
syslog:
host: "syslog.example.com"
port: 10514
network: "tcp"
```
### File Configuration Example
```yaml
chronicleforwarder:
export_type: "file"
file:
path: "/path/to/logfile"
```
---
113 changes: 113 additions & 0 deletions exporter/chronicleforwarderexporter/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chronicleforwarderexporter

import (
"errors"
"fmt"

"github.com/observiq/bindplane-agent/expr"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"
)

const (
// exportTypeSyslog is the syslog export type.
exportTypeSyslog = "syslog"

// exportTypeFile is the file export type.
exportTypeFile = "file"
)

// Config defines configuration for the Chronicle exporter.
type Config struct {
exporterhelper.TimeoutSettings `mapstructure:",squash"`
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

// ExportType is the type of export to use.
ExportType string `mapstructure:"export_type"`

// Syslog is the configuration for the connection to the Chronicle forwarder.
Syslog SyslogConfig `mapstructure:"syslog"`

// File is the configuration for the connection to the Chronicle forwarder.
File File `mapstructure:"file"`

// RawLogField is the field name that will be used to send raw logs to Chronicle.
RawLogField string `mapstructure:"raw_log_field"`
}

// SyslogConfig defines configuration for the Chronicle forwarder connection.
type SyslogConfig struct {
confignet.NetAddr `mapstructure:",squash"`

// TLSSetting struct exposes TLS client configuration.
TLSSetting *configtls.TLSClientSetting `mapstructure:"tls"`
}

// File defines configuration for sending to.
type File struct {
// Path is the path to the file to send to Chronicle.
Path string `mapstructure:"path"`
}

// validate validates the Syslog configuration.
func (s *SyslogConfig) validate() error {
if s.NetAddr.Endpoint == "" {
return errors.New("incomplete syslog configuration: endpoint is required")
}
return nil
}

// validate validates the File configuration.
func (f *File) validate() error {
if f.Path == "" {
return errors.New("file path is required for file export type")
}
return nil
}

// Validate validates the Chronicle exporter configuration.
func (cfg *Config) Validate() error {
if cfg.ExportType != exportTypeSyslog && cfg.ExportType != exportTypeFile {
return errors.New("export_type must be either 'syslog' or 'file'")
}

if cfg.ExportType == exportTypeSyslog {
if err := cfg.Syslog.validate(); err != nil {
return err
}
}

if cfg.ExportType == exportTypeFile {
if err := cfg.File.validate(); err != nil {
return err
}
}

if cfg.RawLogField != "" {
_, err := expr.NewOTTLLogRecordExpression(cfg.RawLogField, component.TelemetrySettings{
Logger: zap.NewNop(),
})
if err != nil {
return fmt.Errorf("raw_log_field is invalid: %s", err)
}
}
return nil
}
86 changes: 86 additions & 0 deletions exporter/chronicleforwarderexporter/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chronicleforwarderexporter

import (
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/confignet"
)

func TestConfig_Validate(t *testing.T) {
tests := []struct {
name string
cfg Config
wantErr bool
}{
{
name: "Valid syslog config",
cfg: Config{
ExportType: exportTypeSyslog,
Syslog: SyslogConfig{
NetAddr: confignet.NetAddr{
Endpoint: "localhost:514",
Transport: "tcp",
},
},
},
wantErr: false,
},
{
name: "Invalid syslog config - missing host",
cfg: Config{
ExportType: exportTypeSyslog,
Syslog: SyslogConfig{
NetAddr: confignet.NetAddr{
Endpoint: "",
Transport: "tcp",
},
},
},
wantErr: true,
},
{
name: "Valid file config",
cfg: Config{
ExportType: exportTypeFile,
File: File{
Path: "/path/to/file",
},
},
wantErr: false,
},
{
name: "Invalid file config - missing path",
cfg: Config{
ExportType: exportTypeFile,
File: File{},
},
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
18 changes: 18 additions & 0 deletions exporter/chronicleforwarderexporter/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate mdatagen metadata.yaml

// Package chronicleforwarderexporter exports OpenTelemetry data to an endpoint or file.
package chronicleforwarderexporter // import "github.com/observiq/bindplane-agent/exporter/azureblobexporter"
115 changes: 115 additions & 0 deletions exporter/chronicleforwarderexporter/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright observIQ, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chronicleforwarderexporter

import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"os"
"strings"

"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"
)

type chronicleForwarderExporter struct {
cfg *Config
logger *zap.Logger
marshaler logMarshaler
endpoint string
}

func newExporter(cfg *Config, params exporter.CreateSettings) (*chronicleForwarderExporter, error) {
return &chronicleForwarderExporter{
cfg: cfg,
logger: params.Logger,
marshaler: newMarshaler(*cfg, params.TelemetrySettings),
}, nil
}

func (ce *chronicleForwarderExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (ce *chronicleForwarderExporter) logsDataPusher(ctx context.Context, ld plog.Logs) error {
// Open connection or file before sending each payload
writer, err := ce.openWriter()
if err != nil {
return fmt.Errorf("open writer: %w", err)
}
defer writer.Close()

payloads, err := ce.marshaler.MarshalRawLogs(ctx, ld)
if err != nil {
return fmt.Errorf("marshal logs: %w", err)
}

for _, payload := range payloads {
if err := ce.send(payload, writer); err != nil {
return fmt.Errorf("upload to Chronicle forwarder: %w", err)
}
}

return nil
}

func (ce *chronicleForwarderExporter) openWriter() (io.WriteCloser, error) {
switch ce.cfg.ExportType {
case exportTypeSyslog:
return ce.openSyslogWriter()
case exportTypeFile:
return ce.openFileWriter()
default:
return nil, errors.New("unsupported export type")
}
}

func (ce *chronicleForwarderExporter) openFileWriter() (io.WriteCloser, error) {
return os.OpenFile(ce.cfg.File.Path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
}

func (ce *chronicleForwarderExporter) openSyslogWriter() (io.WriteCloser, error) {
var conn net.Conn
var err error
if ce.cfg.Syslog.TLSSetting != nil {
tlsConfig, err := ce.cfg.Syslog.TLSSetting.LoadTLSConfig()
if err != nil {
return nil, fmt.Errorf("load TLS config: %w", err)
}
conn, err = tls.Dial(ce.cfg.Syslog.NetAddr.Transport, ce.cfg.Syslog.NetAddr.Endpoint, tlsConfig)
} else {
conn, err = net.Dial(ce.cfg.Syslog.NetAddr.Transport, ce.cfg.Syslog.NetAddr.Endpoint)
}

if err != nil {
return nil, fmt.Errorf("dial: %w", err)
}
return conn, nil
}

func (ce *chronicleForwarderExporter) send(msg string, writer io.WriteCloser) error {
if !strings.HasSuffix(msg, "\n") {
msg = fmt.Sprintf("%s%s", msg, "\n")
}

_, err := io.WriteString(writer, msg)
return err
}
Loading

0 comments on commit d409ef4

Please sign in to comment.