Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat] Add a configuration option for TCP/UDP network type #40623

Merged
merged 7 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Added `ignore_empty_values` flag in `decode_cef` Filebeat processor. {pull}40268[40268]
- Bump version of elastic/toutoumomoma to remove internal forks of stdlib debug packages. {pull}40325[40325]
- Refactor x-pack/filebeat/input/websocket for generalisation. {pull}40308[40308]
- Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623]

==== Deprecated

Expand Down
6 changes: 6 additions & 0 deletions filebeat/docs/inputs/input-common-tcp-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ The maximum size of the message received over TCP. The default is `20MiB`.

The host and TCP port to listen on for event streams.

[float]
[id="{beatname_lc}-input-{type}-tcp-network"]
==== `network`

The network type. Acceptable values are: "tcp" (default), "tcp4", "tcp6"

[float]
[id="{beatname_lc}-input-{type}-tcp-framing"]
==== `framing`
Expand Down
6 changes: 6 additions & 0 deletions filebeat/docs/inputs/input-common-udp-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ The maximum size of the message received over UDP. The default is `10KiB`.

The host and UDP port to listen on for event streams.

[float]
[id="{beatname_lc}-input-{type}-udp-network"]
==== `network`

The network type. Acceptable values are: "udp" (default), "udp4", "udp6"

[float]
[id="{beatname_lc}-input-{type}-udp-read-buffer"]
==== `read_buffer`
Expand Down
20 changes: 19 additions & 1 deletion filebeat/inputsource/tcp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package tcp

import (
"errors"
"fmt"
"time"

Expand All @@ -35,12 +36,29 @@ type Config struct {
MaxMessageSize cfgtype.ByteSize `config:"max_message_size" validate:"nonzero,positive"`
MaxConnections int `config:"max_connections"`
TLS *tlscommon.ServerConfig `config:"ssl"`
Network string `config:"network"`
}

const (
networkTCP = "tcp"
networkTCP4 = "tcp4"
networkTCP6 = "tcp6"
)

var (
ErrInvalidNetwork = errors.New("invalid network value")
ErrMissingHostPort = errors.New("need to specify the host using the `host:port` syntax")
)

// Validate validates the Config option for the tcp input.
func (c *Config) Validate() error {
if len(c.Host) == 0 {
return fmt.Errorf("need to specify the host using the `host:port` syntax")
return ErrMissingHostPort
}
switch c.Network {
case "", networkTCP, networkTCP4, networkTCP6:
default:
return fmt.Errorf("%w: %s, expected: %v or %v or %v ", ErrInvalidNetwork, c.Network, networkTCP, networkTCP4, networkTCP6)
}
return nil
}
74 changes: 74 additions & 0 deletions filebeat/inputsource/tcp/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package tcp

import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)

func TestValidate(t *testing.T) {
type testCfg struct {
name string
cfg Config
wantErr error
}

tests := []testCfg{
{
name: "ok",
cfg: Config{
Host: "localhost:9000",
},
},
{
name: "nohost",
wantErr: ErrMissingHostPort,
},
{
name: "invalidnetwork",
cfg: Config{
Host: "localhost:9000",
Network: "foo",
},
wantErr: ErrInvalidNetwork,
},
}

for _, network := range []string{networkTCP, networkTCP4, networkTCP6} {
tests = append(tests, testCfg{
name: "network_" + network,
cfg: Config{
Host: "localhost:9000",
Network: network,
},
})
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := tc.cfg.Validate()
diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors())
if diff != "" {
t.Fatal(diff)
}
})
}
}
12 changes: 10 additions & 2 deletions filebeat/inputsource/tcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ func New(
func (s *Server) createServer() (net.Listener, error) {
var l net.Listener
var err error
network := s.network()
if s.tlsConfig != nil {
t := s.tlsConfig.BuildServerConfig(s.config.Host)
l, err = tls.Listen("tcp", s.config.Host, t)
l, err = tls.Listen(network, s.config.Host, t)
if err != nil {
return nil, err
}
} else {
l, err = net.Listen("tcp", s.config.Host)
l, err = net.Listen(network, s.config.Host)
if err != nil {
return nil, err
}
Expand All @@ -85,3 +86,10 @@ func (s *Server) createServer() (net.Listener, error) {
}
return l, nil
}

func (s *Server) network() string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is style/consistenty comment. I'm ok if you leave it as is, just want to bring it up to consider.

A common theme in the beats code base for default values is to make a defaultConfig function, that returns the struct with the default value filled in. And then when Unpack is run, if the supplied config didn't have the value, you get the default. And validate can work on the real values you want to check. An example would be:

func configure(cfg *conf.C) (stateless.Input, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
return newServer(config)
}
func defaultConfig() config {
return config{
Config: tcp.Config{
Timeout: time.Minute * 5,
MaxMessageSize: 20 * humanize.MiByte,
},
LineDelimiter: "\n",
}
}

what do you think about using that pattern here?

Copy link
Member Author

@aleksmaus aleksmaus Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I saw this pattern, was not sure if that is consistent across the board and if nobody is going to ask why we do set the configuration option when it's was not set.
One example:
https://github.com/elastic/beats/blob/main/filebeat/inputsource/udp/server_test.go#L62

The original ticket requirement was:

"When no option is provided, the default value (udp or tcp) is used"

So, took it literally in this case: not changing the config, and using the default network type "When no option is provided". I'd probably still leave this network() function at the point when the server is created in case if the config value gets messed up on a way to the server initialization.

if s.config.Network != "" {
return s.config.Network
}
return networkTCP
}
13 changes: 11 additions & 2 deletions filebeat/inputsource/tcp/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ func TestErrorOnEmptyLineDelimiter(t *testing.T) {
}

func TestReceiveEventsAndMetadata(t *testing.T) {
// Excluding tcp6 for now, since it fails in our CI
for _, network := range []string{networkTCP, networkTCP4} {
testReceiveEventsAndMetadata(t, network)
}
}

func testReceiveEventsAndMetadata(t *testing.T, network string) {
expectedMessages := generateMessages(5, 100)
largeMessages := generateMessages(10, 4096)
extraLargeMessages := generateMessages(2, 65*1024)
Expand Down Expand Up @@ -220,6 +227,7 @@ func TestReceiveEventsAndMetadata(t *testing.T) {
if !assert.NoError(t, err) {
return
}
config.Network = network

splitFunc, err := streaming.SplitFunc(test.framing, test.delimiter)
if !assert.NoError(t, err) {
Expand All @@ -237,7 +245,8 @@ func TestReceiveEventsAndMetadata(t *testing.T) {
}
defer server.Stop()

conn, err := net.Dial("tcp", server.Listener.Listener.Addr().String())
addr := server.Listener.Listener.Addr().String()
conn, err := net.Dial(network, addr)
require.NoError(t, err)
fmt.Fprint(conn, test.messageSent)
conn.Close()
Expand Down Expand Up @@ -294,8 +303,8 @@ func TestReceiveNewEventsConcurrently(t *testing.T) {
for w := 0; w < workers; w++ {
go func() {
conn, err := net.Dial("tcp", server.Listener.Listener.Addr().String())
defer conn.Close()
assert.NoError(t, err)
defer conn.Close()
for _, sample := range samples {
fmt.Fprintln(conn, sample)
}
Expand Down
21 changes: 21 additions & 0 deletions filebeat/inputsource/udp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package udp

import (
"errors"
"fmt"
"time"

"github.com/elastic/beats/v7/libbeat/common/cfgtype"
Expand All @@ -29,4 +31,23 @@ type Config struct {
MaxMessageSize cfgtype.ByteSize `config:"max_message_size" validate:"positive,nonzero"`
Timeout time.Duration `config:"timeout"`
ReadBuffer cfgtype.ByteSize `config:"read_buffer" validate:"positive"`
Network string `config:"network"`
}

const (
networkUDP = "udp"
networkUDP4 = "udp4"
networkUDP6 = "udp6"
)

var ErrInvalidNetwork = errors.New("invalid network value")

// Validate validates the Config option for the udp input.
func (c *Config) Validate() error {
switch c.Network {
case "", networkUDP, networkUDP4, networkUDP6:
default:
return fmt.Errorf("%w: %s, expected: %v or %v or %v", ErrInvalidNetwork, c.Network, networkUDP, networkUDP4, networkUDP6)
}
return nil
}
70 changes: 70 additions & 0 deletions filebeat/inputsource/udp/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package udp

import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)

func TestValidate(t *testing.T) {
type testCfg struct {
name string
cfg Config
wantErr error
}

tests := []testCfg{
{
name: "ok",
cfg: Config{
Host: "localhost:8080",
},
},
{
name: "invalidnetwork",
cfg: Config{
Host: "localhost:8080",
Network: "foo",
},
wantErr: ErrInvalidNetwork,
},
}

for _, network := range []string{networkUDP, networkUDP4, networkUDP6} {
tests = append(tests, testCfg{
name: "network_" + network,
cfg: Config{
Host: "localhost:8080",
Network: network,
},
})
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := tc.cfg.Validate()
diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors())
if diff != "" {
t.Fatal(diff)
}
})
}
}
12 changes: 10 additions & 2 deletions filebeat/inputsource/udp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ func New(config *Config, callback inputsource.NetworkFunc) *Server {

func (u *Server) createConn() (net.PacketConn, error) {
var err error
udpAdddr, err := net.ResolveUDPAddr("udp", u.config.Host)
network := u.network()
udpAdddr, err := net.ResolveUDPAddr(network, u.config.Host)
if err != nil {
return nil, err
}
listener, err := net.ListenUDP("udp", udpAdddr)
listener, err := net.ListenUDP(network, udpAdddr)
if err != nil {
return nil, err
}
Expand All @@ -71,3 +72,10 @@ func (u *Server) createConn() (net.PacketConn, error) {

return listener, err
}

func (u *Server) network() string {
if u.config.Network != "" {
return u.config.Network
}
return networkUDP
}
12 changes: 11 additions & 1 deletion filebeat/inputsource/udp/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ type info struct {
}

func TestReceiveEventFromUDP(t *testing.T) {
// Excluding udp6 for now, since it fails in our CI
for _, network := range []string{networkUDP, networkUDP4} {
t.Run(network, func(t *testing.T) {
testReceiveEventFromUDPWithNetwork(t, network)
})
}
}

func testReceiveEventFromUDPWithNetwork(t *testing.T, network string) {
tests := []struct {
name string
message []byte
Expand All @@ -64,6 +73,7 @@ func TestReceiveEventFromUDP(t *testing.T) {
MaxMessageSize: maxMessageSize,
Timeout: timeout,
ReadBuffer: maxSocketSize,
Network: network,
}
fn := func(message []byte, metadata inputsource.NetworkMetadata) {
ch <- info{message: message, mt: metadata}
Expand All @@ -77,7 +87,7 @@ func TestReceiveEventFromUDP(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
conn, err := net.Dial("udp", s.localaddress)
conn, err := net.Dial(s.network(), s.localaddress)
if !assert.NoError(t, err) {
return
}
Expand Down
Loading