Skip to content

Commit

Permalink
[receiver/netflow] Add the netflow receiver - PR 1 (open-telemetry#34164
Browse files Browse the repository at this point in the history
)

Co-authored-by: Curtis Robert <[email protected]>
Co-authored-by: Evan Bradley <[email protected]>
  • Loading branch information
3 people authored Nov 25, 2024
1 parent a8a8b6d commit 5698c9d
Show file tree
Hide file tree
Showing 26 changed files with 791 additions and 1 deletion.
27 changes: 27 additions & 0 deletions .chloggen/netflow-receiver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: netflowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Introduce the netflow receiver

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32732]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ receiver/mongodbatlasreceiver/ @open-telemetry/collector-cont
receiver/mongodbreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski @schmikei
receiver/mysqlreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski
receiver/namedpipereceiver/ @open-telemetry/collector-contrib-approvers @sinkingpoint @djaglowski
receiver/netflowreceiver/ @open-telemetry/collector-contrib-approvers @evan-bradley @dlopes7
receiver/nginxreceiver/ @open-telemetry/collector-contrib-approvers @djaglowski
receiver/nsxtreceiver/ @open-telemetry/collector-contrib-approvers @dashpole @schmikei
receiver/ntpreceiver/ @open-telemetry/collector-contrib-approvers @atoulme
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ body:
- receiver/mongodbatlas
- receiver/mysql
- receiver/namedpipe
- receiver/netflow
- receiver/nginx
- receiver/nsxt
- receiver/ntp
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ body:
- receiver/mongodbatlas
- receiver/mysql
- receiver/namedpipe
- receiver/netflow
- receiver/nginx
- receiver/nsxt
- receiver/ntp
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ body:
- receiver/mongodbatlas
- receiver/mysql
- receiver/namedpipe
- receiver/netflow
- receiver/nginx
- receiver/nsxt
- receiver/ntp
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/unmaintained.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ body:
- receiver/mongodbatlas
- receiver/mysql
- receiver/namedpipe
- receiver/netflow
- receiver/nginx
- receiver/nsxt
- receiver/ntp
Expand Down
3 changes: 2 additions & 1 deletion cmd/githubgen/allowlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ Hemansh31
shazlehu
dsimil
KiranmayiB
harishbohara11
harishbohara11
dlopes7
1 change: 1 addition & 0 deletions receiver/netflowreceiver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
102 changes: 102 additions & 0 deletions receiver/netflowreceiver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Netflow receiver
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: logs |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fnetflow%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fnetflow) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fnetflow%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fnetflow) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@evan-bradley](https://www.github.com/evan-bradley), [@dlopes7](https://www.github.com/dlopes7) |

[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
<!-- end autogenerated section -->

The netflow receiver can listen for [netflow](https://en.wikipedia.org/wiki/NetFlow), [sflow](https://en.wikipedia.org/wiki/SFlow), and [ipfix](https://en.wikipedia.org/wiki/IP_Flow_Information_Export) data and convert it to OpenTelemetry logs. The receiver is based on the [goflow2](https://github.com/netsampler/goflow2) project.

This gives OpenTelemetry users the capability of monitoring network traffic, and answer questions like:

* Which protocols are passing through the network?
* Which servers and clients are producing the highest amount of traffic?
* What ports are involved in these network calls?
* How many bytes and packets are being sent and received?

## Getting started

By default the receiver will listen for ipfix and netflow on port `2055`. The receiver can be configured to listen on different ports and protocols.

Example configuration:

```yaml
receivers:
netflow:
- scheme: netflow
port: 2055
sockets: 16
workers: 32

processors:
batch:
send_batch_size: 2000
timeout: 30s

exporters:
debug:
verbosity: detailed

service:
pipelines:
logs:
receivers: [netflow]
processors: [batch]
exporters: [debug]
telemetry:
logs:
level: debug
```
We recommend using the batch processor to reduce the number of log requests being sent to the exporter. The batch processor will batch log records together and send them in a single request to the exporter.
You would then configure your network devices to send netflow, sflow, or ipfix data to the Collector on the specified ports.
## Configuration
| Field | Description | Examples | Default |
|-------|-------------|--------| ------- |
| scheme | The type of flow data that to receive | `sflow`, `netflow`, `flow` | `netflow` |
| hostname | The hostname or IP address to bind to | `localhost` | `0.0.0.0` |
| port | The port to bind to | `2055` or `6343` | `2055` |
| sockets | The number of sockets to use | 1 | 1 |
| workers | The number of workers used to decode incoming flow messages | 2 | 2 |
| queue_size | The size of the incoming netflow packets queue | 1000 | 1000000 |

## Data format

The netflow data is standardized for the different schemas and is converted to OpenTelemetry logs following the [semantic conventions](https://opentelemetry.io/docs/specs/semconv/general/attributes/#server-client-and-shared-network-attributes)

The output will adhere the format:

```json
{
"destination": {
"address": "192.168.0.1",
"port": 22
},
"flow": {
"end": 1731073104662487000,
"sampler_address": "192.168.0.2",
"sequence_num": 49,
"start": 1731073077662487000,
"time_received": 1731073138662487000,
"type": "NETFLOW_V5"
},
"io": {
"bytes": 529,
"packets": 378
},
"source": {
"address": "192.168.0.3",
"port": 40
},
"transport": "TCP",
"type": "IPv4"
}
```
65 changes: 65 additions & 0 deletions receiver/netflowreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver"

import "fmt"

// Config represents the receiver config settings within the collector's config.yaml
type Config struct {
// The scheme defines the type of flow data that the listener will receive
// The scheme must be one of sflow, netflow, or flow
Scheme string `mapstructure:"scheme"`

// The hostname or IP address that the listener will bind to
Hostname string `mapstructure:"hostname"`

// The port that the listener will bind to
Port int `mapstructure:"port"`

// The number of sockets that the listener will use
Sockets int `mapstructure:"sockets"`

// The number of workers that the listener will use to decode incoming flow messages
// By default it will be two times the number of sockets
// Ideally set this to the number of CPU cores
Workers int `mapstructure:"workers"`

// The size of the queue that the listener will use
// This is a buffer that will hold flow messages before they are processed by a worker
QueueSize int `mapstructure:"queue_size"`
}

// Validate checks if the receiver configuration is valid
func (cfg *Config) Validate() error {
validSchemes := [3]string{"sflow", "netflow", "flow"}

validScheme := false
for _, scheme := range validSchemes {
if cfg.Scheme == scheme {
validScheme = true
break
}
}
if !validScheme {
return fmt.Errorf("scheme must be one of sflow, netflow, or flow")
}

if cfg.Sockets <= 0 {
return fmt.Errorf("sockets must be greater than 0")
}

if cfg.Workers <= 0 {
return fmt.Errorf("workers must be greater than 0")
}

if cfg.QueueSize <= 0 {
cfg.QueueSize = defaultQueueSize
}

if cfg.Port <= 0 {
return fmt.Errorf("port must be greater than 0")
}

return nil
}
92 changes: 92 additions & 0 deletions receiver/netflowreceiver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package netflowreceiver

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver/internal/metadata"
)

func TestLoadConfig(t *testing.T) {
t.Parallel()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
expected component.Config
}{
{
id: component.NewIDWithName(metadata.Type, "defaults"),
expected: createDefaultConfig(),
},
{
id: component.NewIDWithName(metadata.Type, "one_listener"),
expected: &Config{
Scheme: "netflow",
Port: 2055,
Sockets: 1,
Workers: 1,
QueueSize: 1000000,
},
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg)
})
}
}

func TestInvalidConfig(t *testing.T) {
t.Parallel()

cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

tests := []struct {
id component.ID
err string
}{
{
id: component.NewIDWithName(metadata.Type, "invalid_schema"),
err: "scheme must be one of sflow, netflow, or flow",
},
{
id: component.NewIDWithName(metadata.Type, "invalid_port"),
err: "port must be greater than 0",
},
}

for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(tt.id.String())
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))

err = component.ValidateConfig(cfg)
assert.ErrorContains(t, err, tt.err)
})
}
}
6 changes: 6 additions & 0 deletions receiver/netflowreceiver/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml

package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver"
51 changes: 51 additions & 0 deletions receiver/netflowreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package netflowreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/netflowreceiver/internal/metadata"
)

const (
defaultSockets = 1
defaultWorkers = 2
defaultQueueSize = 1_000_000
)

// NewFactory creates a factory for netflow receiver.
func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
createDefaultConfig,
receiver.WithLogs(createLogsReceiver, metadata.LogsStability))
}

func createDefaultConfig() component.Config {
return &Config{
Scheme: "netflow",
Port: 2055,
Sockets: defaultSockets,
Workers: defaultWorkers,
QueueSize: defaultQueueSize,
}
}

func createLogsReceiver(_ context.Context, params receiver.Settings, cfg component.Config, consumer consumer.Logs) (receiver.Logs, error) {
logger := params.Logger
conf := cfg.(*Config)

nr := &netflowReceiver{
logger: logger,
logConsumer: consumer,
config: conf,
}

return nr, nil
}
Loading

0 comments on commit 5698c9d

Please sign in to comment.