Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Host decorator #85

Merged
merged 4 commits into from
Aug 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased
### Fixed
- Google Cloud Output failure when sent a field of type uint16
### Added
- Added a default function to plugin templates
- Add a host metadata operator that adds hostname to entries

## [0.9.7] - 2020-08-05
### Changed
Expand Down
1 change: 1 addition & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ General purpose:
- [Restructure records](/docs/operators/restructure.md)
- [Router](/docs/operators/router.md)
- [Kubernetes Metadata Decorator](/docs/operators/k8s_metadata_decorator.md)
- [Host Metadata](/docs/operators/host_metadata.md)
- [Rate limit](/docs/operators/rate_limit.md)

Or create your own [plugins](/docs/plugins.md) for a technology-specific use case.
56 changes: 56 additions & 0 deletions docs/operators/host_metadata.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
## `host_metadata` operator

The `host_metadata` operator adds labels to incoming entries.

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `metadata` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `include_hostname` | `true` | Whether to set the `hostname` label on entries |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |

### Example Configurations

#### Add static tags and labels

Configuration:
```yaml
- type: host_metadata
include_hostname: true
```

<table>
<tr><td> Input entry </td> <td> Output entry </td></tr>
<tr>
<td>

```json
{
"timestamp": "2020-06-15T11:15:50.475364-04:00",
"labels": {},
"record": {
"message": "test"
}
}
```

</td>
<td>

```json
{
"timestamp": "2020-06-15T11:15:50.475364-04:00",
"labels": {
"hostname": "my_host"
},
"record": {
"message": "test"
}
}
```

</td>
</tr>
</table>
51 changes: 9 additions & 42 deletions internal/testutil/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,50 +30,17 @@ func NewFakeOutput(t TestingT) *FakeOutput {
}
}

func (f *FakeOutput) CanOutput() bool {
return false
}

func (f *FakeOutput) CanProcess() bool {
return true
}
func (f *FakeOutput) CanOutput() bool { return false }
func (f *FakeOutput) CanProcess() bool { return true }
func (f *FakeOutput) ID() string { return "fake" }
func (f *FakeOutput) Logger() *zap.SugaredLogger { return f.SugaredLogger }
func (f *FakeOutput) Outputs() []operator.Operator { return nil }
func (f *FakeOutput) SetOutputs(outputs []operator.Operator) error { return nil }
func (f *FakeOutput) Start() error { return nil }
func (f *FakeOutput) Stop() error { return nil }
func (f *FakeOutput) Type() string { return "fake_output" }

func (f *FakeOutput) ID() string {
return "fake"
}

// Logger provides a mock function with given fields:
func (f *FakeOutput) Logger() *zap.SugaredLogger {
return f.SugaredLogger
}

// Outputs provides a mock function with given fields:
func (f *FakeOutput) Outputs() []operator.Operator {
return nil
}

// Process provides a mock function with given fields: _a0, _a1
func (f *FakeOutput) Process(ctx context.Context, entry *entry.Entry) error {
f.Received <- entry
return nil
}

// SetOutputs provides a mock function with given fields: _a0
func (f *FakeOutput) SetOutputs(outputs []operator.Operator) error {
return nil
}

// Start provides a mock function with given fields:
func (f *FakeOutput) Start() error {
return nil
}

// Stop provides a mock function with given fields:
func (f *FakeOutput) Stop() error {
return nil
}

// Type provides a mock function with given fields:
func (f *FakeOutput) Type() string {
return "fake_output"
}
132 changes: 132 additions & 0 deletions operator/builtin/transformer/host_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package transformer

import (
"context"
"net"
"os"

"github.com/observiq/carbon/entry"
"github.com/observiq/carbon/errors"
"github.com/observiq/carbon/operator"
"github.com/observiq/carbon/operator/helper"
)

func init() {
operator.Register("host_metadata", func() operator.Builder { return NewHostMetadataConfig("") })
}

// Variables that are overridable for testing
var hostname = os.Hostname

// NewHostMetadataConfig returns a HostMetadataConfig with default values
func NewHostMetadataConfig(operatorID string) *HostMetadataConfig {
return &HostMetadataConfig{
TransformerConfig: helper.NewTransformerConfig(operatorID, "host_decorator"),
IncludeHostname: true,
IncludeIP: true,
}
}

//
type HostMetadataConfig struct {
helper.TransformerConfig `yaml:",inline"`
IncludeHostname bool `json:"include_hostname,omitempty" yaml:"include_hostname,omitempty"`
IncludeIP bool `json:"include_ip,omitempty" yaml:"include_ip,omitempty"`
}

// Build will build an operator from the supplied configuration
func (c HostMetadataConfig) Build(context operator.BuildContext) (operator.Operator, error) {
transformerOperator, err := c.TransformerConfig.Build(context)
if err != nil {
return nil, errors.Wrap(err, "failed to build transformer")
}

op := &HostMetadata{
TransformerOperator: transformerOperator,
includeHostname: c.IncludeHostname,
includeIP: c.IncludeIP,
}

if c.IncludeHostname {
op.hostname, err = hostname()
if err != nil {
return nil, errors.Wrap(err, "get hostname")
}
}

if c.IncludeIP {
ip, err := getIP()
if err != nil {
return nil, errors.Wrap(err, "get ip address")
}
op.ip = ip
}

return op, nil
}

func getIP() (string, error) {
var ip string

ifaces, err := net.Interfaces()
if err != nil {
return "", errors.Wrap(err, "list interfaces")
}

for _, iface := range ifaces {
// Skip loopback interfaces
if iface.Flags&net.FlagLoopback != 0 {
continue
}

// Skip down interfaces
if iface.Flags&net.FlagUp == 0 {
continue
}

addrs, err := iface.Addrs()
if err != nil {
continue
}
if len(addrs) > 0 {
ip = addrs[0].String()
}
}

if len(ip) == 0 {
return "", errors.NewError(
"failed to find ip address",
"check that a non-loopback interface with an assigned IP address exists and is running",
)
}

return ip, nil
}

// HostMetadata is an operator that can add host metadata to incoming entries
type HostMetadata struct {
helper.TransformerOperator

hostname string
ip string
includeHostname bool
includeIP bool
}

// Process will process an incoming entry using the metadata transform.
func (h *HostMetadata) Process(ctx context.Context, entry *entry.Entry) error {
return h.ProcessWith(ctx, entry, h.Transform)
}

// Transform will transform an entry, adding the configured host metadata.
func (h *HostMetadata) Transform(entry *entry.Entry) (*entry.Entry, error) {
if h.includeHostname {
entry.AddLabel("hostname", h.hostname)
}

if h.includeIP {
entry.AddLabel("ip", h.ip)
}

return entry, nil
}
130 changes: 130 additions & 0 deletions operator/builtin/transformer/host_metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package transformer

import (
"context"
"os"
"sync"
"testing"

"github.com/observiq/carbon/entry"
"github.com/observiq/carbon/internal/testutil"
"github.com/observiq/carbon/operator"
"github.com/stretchr/testify/require"
)

func testHostname() (string, error) {
return "test", nil
}

func TestHostMetadata(t *testing.T) {
cases := []struct {
name string
modifyConfig func(*HostMetadataConfig)
expectedLabels map[string]string
fakeHostname func() (string, error)
}{
{
"Default",
func(cfg *HostMetadataConfig) {
cfg.IncludeIP = false
},
map[string]string{
"hostname": "test",
},
testHostname,
},
{
"NoHostname",
func(cfg *HostMetadataConfig) {
cfg.IncludeHostname = false
cfg.IncludeIP = false
},
nil,
testHostname,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
hostname = tc.fakeHostname
defer func() { hostname = os.Hostname }()

cfg := NewHostMetadataConfig("test_id")
cfg.OutputIDs = []string{"fake"}
tc.modifyConfig(cfg)

op, err := cfg.Build(testutil.NewBuildContext(t))
require.NoError(t, err)

fake := testutil.NewFakeOutput(t)
err = op.SetOutputs([]operator.Operator{fake})
require.NoError(t, err)

e := entry.New()
op.Process(context.Background(), e)
select {
case r := <-fake.Received:
require.Equal(t, tc.expectedLabels, r.Labels)
default:
require.FailNow(t, "Expected entry")
}
})
}
}

type hostMetadataBenchmark struct {
name string
cfgMod func(*HostMetadataConfig)
}

func (g *hostMetadataBenchmark) Run(b *testing.B) {
cfg := NewHostMetadataConfig(g.name)
g.cfgMod(cfg)
op, err := cfg.Build(testutil.NewBuildContext(b))
require.NoError(b, err)

fake := testutil.NewFakeOutput(b)
op.(*HostMetadata).OutputOperators = []operator.Operator{fake}

b.ResetTimer()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < b.N; i++ {
e := entry.New()
op.Process(context.Background(), e)
}
err = op.Stop()
require.NoError(b, err)
}()

wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < b.N; i++ {
<-fake.Received
}
}()

wg.Wait()
}

func BenchmarkHostMetadata(b *testing.B) {
cases := []hostMetadataBenchmark{
{
"Default",
func(cfg *HostMetadataConfig) {},
},
{
"NoHostname",
func(cfg *HostMetadataConfig) {
cfg.IncludeHostname = false
},
},
}

for _, tc := range cases {
b.Run(tc.name, tc.Run)
}
}