Skip to content

Commit

Permalink
[filebeat][streaming] - Added support for TLS & forward proxy configs…
Browse files Browse the repository at this point in the history
… for websockets (#41934) (#42009)

(cherry picked from commit fd81074)

Co-authored-by: ShourieG <[email protected]>
  • Loading branch information
mergify[bot] and ShourieG authored Dec 12, 2024
1 parent e1ad660 commit cce3e18
Show file tree
Hide file tree
Showing 9 changed files with 402 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869]
- Add support for SSL and Proxy configurations for websoket type in streaming input. {pull}41934[41934]
- AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694]
- The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932]

Expand Down
22 changes: 22 additions & 0 deletions x-pack/filebeat/docs/inputs/input-streaming.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,28 @@ The minimum time to wait between retries. This ensures that retries are spaced o

The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying.

[float]
=== `timeout`
Timeout is the maximum amount of time the websocket dialer will wait for a connection to be established. The default value is `180` seconds.

[float]
==== `proxy_url`
This specifies the forward proxy URL to use for the connection. The `proxy_url` configuration is optional and can be used to configure the proxy settings for the connection. The `proxy_url` default value is set by `http.ProxyFromEnvironment` which reads the `HTTP_PROXY`, `HTTPS_PROXY`, and `NO_PROXY` environment variables.

[float]
==== `proxy_headers`
This specifies the headers to be sent to the proxy server. The `proxy_headers` configuration is optional and can be used to configure the headers to be sent to the proxy server.

[float]
==== `ssl`
This specifies the SSL configuration for the connection. The `ssl` configuration is optional and can be used to configure the SSL settings for the connection. The `ssl` configuration has the following subfields:

- `certificate_authorities`: A list of root certificates to use for verifying the server's certificate.
- `certificate`: The (PEM encoded) certificate to use for client authentication.
- `key`: The (PEM encoded) private key to use for client authentication.

If this is a self-signed certificate, the `certificate_authorities` field should be set to the certificate itself.

[float]
=== Metrics

Expand Down
11 changes: 9 additions & 2 deletions x-pack/filebeat/input/streaming/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ type config struct {
Redact *redact `config:"redact"`
// Retry is the configuration for retrying failed connections.
Retry *retry `config:"retry"`

// Transport is the common the transport config.
Transport httpcommon.HTTPTransportSettings `config:",inline"`

// CrowdstrikeAppID is the value used to set the
// appId request parameter in the FalconHose stream
// discovery request.
Expand Down Expand Up @@ -166,3 +165,11 @@ func checkURLScheme(c config) error {
return fmt.Errorf("unknown stream type: %s", c.Type)
}
}

func defaultConfig() config {
return config{
Transport: httpcommon.HTTPTransportSettings{
Timeout: 180 * time.Second,
},
}
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/streaming/input_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManage
}

func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, error) {
src := &source{cfg: config{}}
src := &source{cfg: defaultConfig()}
if err := cfg.Unpack(&src.cfg); err != nil {
return nil, nil, err
}
Expand Down
249 changes: 249 additions & 0 deletions x-pack/filebeat/input/streaming/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package streaming

import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -41,6 +42,7 @@ type WebSocketHandler func(*testing.T, *websocket.Conn, []string)
var inputTests = []struct {
name string
server func(*testing.T, WebSocketHandler, map[string]interface{}, []string)
proxyServer func(*testing.T, WebSocketHandler, map[string]interface{}, []string) *httptest.Server
handler WebSocketHandler
config map[string]interface{}
response []string
Expand Down Expand Up @@ -450,6 +452,140 @@ var inputTests = []struct {
},
wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"),
},
{
name: "single_event_tls",
server: webSocketServerWithTLS(httptest.NewUnstartedServer),
handler: defaultHandler,
config: map[string]interface{}{
"program": `
bytes(state.response).decode_json().as(inner_body,{
"events": [inner_body],
})`,
"ssl": map[string]interface{}{
"enabled": true,
"certificate_authorities": []string{"testdata/certs/ca.crt"},
"certificate": "testdata/certs/cert.pem",
"key": "testdata/certs/key.pem",
},
},
response: []string{`
{
"pps": {
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071"
},
"ts": "2017-08-17T14:54:12.949180-07:00",
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<[email protected]> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent",
"sm": {
"tls": {
"verify": "NONE"
},
"stat": "Sent",
"qid": "v7HLqYbx029423",
"dsn": "2.0.0",
"mailer": "*file*",
"to": [
"/dev/null"
],
"ctladdr": "<[email protected]> (8/0)",
"delay": "00:00:00",
"xdelay": "00:00:00",
"pri": 35342
},
"id": "ZeYGULpZmL5N0151HN1OyA"
}`},
want: []map[string]interface{}{
{
"pps": map[string]interface{}{
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071",
},
"ts": "2017-08-17T14:54:12.949180-07:00",
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<[email protected]> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent",
"sm": map[string]interface{}{
"tls": map[string]interface{}{
"verify": "NONE",
},
"stat": "Sent",
"qid": "v7HLqYbx029423",
"dsn": "2.0.0",
"mailer": "*file*",
"to": []interface{}{
"/dev/null",
},
"ctladdr": "<[email protected]> (8/0)",
"delay": "00:00:00",
"xdelay": "00:00:00",
"pri": float64(35342),
},
"id": "ZeYGULpZmL5N0151HN1OyA",
},
},
},
{
name: "basic_proxy_forwarding",
proxyServer: newWebSocketProxyTestServer,
handler: defaultHandler,
config: map[string]interface{}{
"program": `
bytes(state.response).decode_json().as(inner_body,{
"events": [inner_body],
})`,
},
response: []string{`
{
"pps": {
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071"
},
"ts": "2017-08-17T14:54:12.949180-07:00",
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<[email protected]> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent",
"sm": {
"tls": {
"verify": "NONE"
},
"stat": "Sent",
"qid": "v7HLqYbx029423",
"dsn": "2.0.0",
"mailer": "*file*",
"to": [
"/dev/null"
],
"ctladdr": "<[email protected]> (8/0)",
"delay": "00:00:00",
"xdelay": "00:00:00",
"pri": 35342
},
"id": "ZeYGULpZmL5N0151HN1OyA"
}`},
want: []map[string]interface{}{
{
"pps": map[string]interface{}{
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071",
},
"ts": "2017-08-17T14:54:12.949180-07:00",
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<[email protected]> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent",
"sm": map[string]interface{}{
"tls": map[string]interface{}{
"verify": "NONE",
},
"stat": "Sent",
"qid": "v7HLqYbx029423",
"dsn": "2.0.0",
"mailer": "*file*",
"to": []interface{}{
"/dev/null",
},
"ctladdr": "<[email protected]> (8/0)",
"delay": "00:00:00",
"xdelay": "00:00:00",
"pri": float64(35342),
},
"id": "ZeYGULpZmL5N0151HN1OyA",
},
},
},
}

var urlEvalTests = []struct {
Expand Down Expand Up @@ -560,6 +696,9 @@ func TestInput(t *testing.T) {
if test.server != nil {
test.server(t, test.handler, test.config, test.response)
}
if test.proxyServer != nil {
test.proxyServer(t, test.handler, test.config, test.response)
}

cfg := conf.MustNewConfigFrom(test.config)

Expand Down Expand Up @@ -771,6 +910,46 @@ func webSocketServerWithRetry(serve func(http.Handler) *httptest.Server) func(*t
}
}

// webSocketServerWithTLS simulates a WebSocket server with TLS based authentication.
func webSocketServerWithTLS(serve func(http.Handler) *httptest.Server) func(*testing.T, WebSocketHandler, map[string]interface{}, []string) {
return func(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) {
server := serve(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
t.Fatalf("error upgrading connection to WebSocket: %v", err)
return
}

handler(t, conn, response)
}))
//nolint:gosec // there is no need to use a secure cert for testing
server.TLS = &tls.Config{
Certificates: []tls.Certificate{generateSelfSignedCert(t)},
}
server.StartTLS()

if config["url"] == nil {
config["url"] = "ws" + server.URL[4:]
}
t.Cleanup(server.Close)
}
}

// generateSelfSignedCert returns a self-signed certificate for testing purposes based on the dummy certs in the testdata directory
func generateSelfSignedCert(t *testing.T) tls.Certificate {
cert, err := tls.LoadX509KeyPair("testdata/certs/cert.pem", "testdata/certs/key.pem")
if err != nil {
t.Fatalf("failed to generate self-signed cert: %v", err)
}
return cert
}

// defaultHandler is a default handler for WebSocket connections.
func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) {
for _, r := range response {
Expand All @@ -780,3 +959,73 @@ func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) {
}
}
}

// webSocketTestServer creates a WebSocket target server that communicates with the proxy handler.
func webSocketTestServer(t *testing.T, handler WebSocketHandler, response []string) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
t.Fatalf("failed to upgrade WebSocket connection: %v", err)
return
}
handler(t, conn, response)
}))
}

// webSocketProxyHandler forwards WebSocket connections to the target server.
//
//nolint:errcheck //we can safely ignore errors checks here
func webSocketProxyHandler(targetURL string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Response.Body.Close()
//nolint:bodyclose // we can ignore the body close here
targetConn, _, err := websocket.DefaultDialer.Dial(targetURL, nil)
if err != nil {
http.Error(w, "failed to connect to backend WebSocket server", http.StatusBadGateway)
return
}
defer targetConn.Close()

upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
clientConn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
http.Error(w, "failed to upgrade client connection", http.StatusInternalServerError)
return
}
defer clientConn.Close()
// forward messages between client and target server
go func() {
for {
messageType, message, err := targetConn.ReadMessage()
if err != nil {
break
}
clientConn.WriteMessage(messageType, message)
}
}()
for {
messageType, message, err := clientConn.ReadMessage()
if err != nil {
break
}
targetConn.WriteMessage(messageType, message)
}
}
}

// newWebSocketProxyTestServer creates a proxy server forwarding WebSocket traffic.
func newWebSocketProxyTestServer(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) *httptest.Server {
backendServer := webSocketTestServer(t, handler, response)
config["url"] = "ws" + backendServer.URL[4:]
config["proxy_url"] = "ws" + backendServer.URL[4:]
return httptest.NewServer(webSocketProxyHandler(config["url"].(string)))
}
21 changes: 21 additions & 0 deletions x-pack/filebeat/input/streaming/testdata/certs/ca.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDbTCCAlWgAwIBAgIUS/rm8sWDc2a+eD9L+q+9XQpBa5MwDQYJKoZIhvcNAQEL
BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAgFw0yNDEyMDUxMjM4NThaGA8yMTI0
MTExMTEyMzg1OFowRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx
ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcN
AQEBBQADggEPADCCAQoCggEBALhEaFVqFuYwSwH4GHhMeqhilC+sWXKaQP8QmaH7
HWRST8Ko6YTT9NixUL4Qs5OmzCQFavRN9qtEo4wtqCJBOEyXQG1wAHuLWIY+KOCB
twUg8fP+uYaYUOQOYNLkBz7SLlejuZYTyGxepIkc+UeJRcOE36anIPHpc2KSr3Hm
vKJxZUVpQEbJvQ7pe7+iLL4jSOfzpQNcV9S/bzTo6taZXuo+ryEPlshkU/ME5VCN
LFrU3AW2fzKW0Xa/skkW5izCiAU8KNEy84UQM6aZkJfFi9O394i97sGgYg+q36XL
sXbZ+sCXHI3CGx+pwOx0h7S8n7iJJ7BbmwM6QuLFF6bFYkkCAwEAAaNTMFEwHQYD
VR0OBBYEFEHtfvey8SdncMr7VDqA2YhtEiGYMB8GA1UdIwQYMBaAFEHtfvey8Sdn
cMr7VDqA2YhtEiGYMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEB
AKFcAeh9yhIbkkxvXl6ebmLcj817NVjhpcvDZlKP2MVu+w/h70w+JwBktLUlZlXd
UNkKEWZyRvrdmY+YN6rwY/QI75N17bcmDWy6QnNlVJF0AJkBSdbKonCgHrZm7K25
TOKpj0QF8l7k9wr5FWHHcBw/vFF9cGZ5TO4HbnI25N/cEKgdzZFEVA5Y/Rv7GIGU
COjJG20Cr2HIKvVYoyWvN6sL7+gbzUMyjvQyGMCT7YoIqscUfrUU+T46QaOLAKa3
z91Obfmv6uTO/rsieoxVWVJ35GeHeNJkAPkr7Z1sWIrreJ/3WsecWuPPEDNDXiSV
5h0bTbbPOyEIe5ydEIbr5kA=
-----END CERTIFICATE-----
Loading

0 comments on commit cce3e18

Please sign in to comment.