Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][streaming] - Added support for TLS & forward proxy configs for websockets #41934

Merged
merged 9 commits into from
Dec 11, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add evaluation state dump debugging option to CEL input. {pull}41335[41335]
- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862]
- Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869]
- Add support for SSL and Proxy configurations for websoket type in streaming input. {pull}41934[41934]

*Auditbeat*

Expand Down
26 changes: 26 additions & 0 deletions x-pack/filebeat/docs/inputs/input-streaming.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,32 @@ The minimum time to wait between retries. This ensures that retries are spaced o

The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying.

[float]
==== `handshake_timeout`
This specifies the time to wait for the `websocket` handshake and the `http.Upgrade()` operation to complete. This timeout occurs at the application layer and not at the TCP layer. The `default value` is `20` seconds. This setting is specific to the `websocket` streaming input type only.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This specifies the time to wait for the `websocket` handshake and the `http.Upgrade()` operation to complete. This timeout occurs at the application layer and not at the TCP layer. The `default value` is `20` seconds. This setting is specific to the `websocket` streaming input type only.
This specifies the time to wait for the WebSocket handshake and protocol upgrade to complete. This timeout occurs at the application layer and not at the TCP layer. The `default value` is `20` seconds. This setting is specific to the `websocket` streaming input type only.

Referring to a call is unhelpful to users; discuss the intent not the mechanism in user-facing documentation. Also, it was incorrectly referring to http.Upgrade which does not exist. It is used by websocket.Upgrader.Upgrade, but we don't call that except in testing, instead using websocket.Dialer.DialContext (should we refer to upgrading at all? I think this is an implementation detail and only tenuously relevant). Note that the lib's default is 45s; do we want to match that or is there a reason to use 20s?

I'm curious, is it meaningful to have a handshake timeout that is less than the TCP timeout? Looking at the implementation of the client, the handshake timeout is used to construct a deadline context that must lose the race to return from DialContext, and this context is passed into the net.Dialer.DialContext call, meaning that obtaining the netconn must fit within this deadline. Where does the timeout below come in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@efd6,
What I was going for initially was faster failures at the application layer to achieve better program responsiveness. But then I realised that we might want to wait for TCP retires to overcome any intermittent network issues present and setting a handshake timeout lesser than the tcp timeout defeats that purpose.

The timeout option below actually controls the net.Dialer.DialContext timeout, which by default is set to 90s, half of the os default for TCP timeout window.

So my question is, should we extend the DialContext and handshake timeouts to mirror the TCP time out value of the os ?

As for the documentation, I'll update it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest taking a look at the gorilla code to see how the timeout is propagated.

Copy link
Contributor Author

@ShourieG ShourieG Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@efd6, I saw the gorilla code just creates a context deadline out of the provided timeout in the dialcontext. So in this case having the handshake value < the dial context doesn't make much sense by default. We can keep them at the same value then if the user feels they need to change they can do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not bother adding the handshake timeout since we already provide a mechanism to hand in a dial timeout which achieves the same end.


[float]
=== `timeout`
This specifies the timeout that occurs after the initial handshake has taken place. This timeout occurs at the TCP layer and deals with the time taken to establish the actual tcp connection. The `default value` is `90` seconds.

[float]
==== `proxy_url`
This specifies the forward proxy URL to use for the connection. The `proxy_url` configuration is optional and can be used to configure the proxy settings for the connection. The `proxy_url` default value is set by `http.ProxyFromEnvironment` which reads the `HTTP_PROXY`, `HTTPS_PROXY`, and `NO_PROXY` environment variables.

[float]
==== `proxy_headers`
This specifies the headers to be sent to the proxy server. The `proxy_headers` configuration is optional and can be used to configure the headers to be sent to the proxy server.

[float]
==== `ssl`
This specifies the SSL configuration for the connection. The `ssl` configuration is optional and can be used to configure the SSL settings for the connection. The `ssl` configuration has the following subfields:

- `certificate_authorities`: A list of root certificates to use for verifying the server's certificate.
- `certificate`: The (PEM encoded) certificate to use for client authentication.
- `key`: The (PEM encoded) private key to use for client authentication.

If this is a self-signed certificate, the `certificate_authorities` field should be set to the certificate itself.

[float]
=== Metrics

Expand Down
21 changes: 18 additions & 3 deletions x-pack/filebeat/input/streaming/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ type config struct {
Redact *redact `config:"redact"`
// Retry is the configuration for retrying failed connections.
Retry *retry `config:"retry"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`

// Transport is the HTTP transport configuration.
Transport transport `config:",inline"`
// CrowdstrikeAppID is the value used to set the
// appId request parameter in the FalconHose stream
// discovery request.
Expand Down Expand Up @@ -95,6 +94,11 @@ type urlConfig struct {
*url.URL
}

type transport struct {
httpcommon.HTTPTransportSettings `config:",inline"`
HandShakeTimeOut time.Duration `config:"handshake_timeout" validate:"min=10"`
}

func (u *urlConfig) Unpack(in string) error {
parsed, err := url.Parse(in)
if err != nil {
Expand Down Expand Up @@ -166,3 +170,14 @@ func checkURLScheme(c config) error {
return fmt.Errorf("unknown stream type: %s", c.Type)
}
}

func defaultConfig() config {
return config{
Transport: transport{
HTTPTransportSettings: httpcommon.HTTPTransportSettings{
Timeout: 90 * time.Second,
},
HandShakeTimeOut: 20 * time.Second,
},
}
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/streaming/input_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManage
}

func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, error) {
src := &source{cfg: config{}}
src := &source{cfg: defaultConfig()}
if err := cfg.Unpack(&src.cfg); err != nil {
return nil, nil, err
}
Expand Down
249 changes: 249 additions & 0 deletions x-pack/filebeat/input/streaming/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package streaming

import (
"context"
"crypto/tls"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -41,6 +42,7 @@ type WebSocketHandler func(*testing.T, *websocket.Conn, []string)
var inputTests = []struct {
name string
server func(*testing.T, WebSocketHandler, map[string]interface{}, []string)
proxyServer func(*testing.T, WebSocketHandler, map[string]interface{}, []string) *httptest.Server
handler WebSocketHandler
config map[string]interface{}
response []string
Expand Down Expand Up @@ -450,6 +452,140 @@ var inputTests = []struct {
},
wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"),
},
{
name: "single_event_tls",
server: webSocketServerWithTLS(httptest.NewUnstartedServer),
handler: defaultHandler,
config: map[string]interface{}{
"program": `
bytes(state.response).decode_json().as(inner_body,{
"events": [inner_body],
})`,
"ssl": map[string]interface{}{
"enabled": true,
"certificate_authorities": []string{"testdata/certs/ca.crt"},
"certificate": "testdata/certs/cert.pem",
"key": "testdata/certs/key.pem",
},
},
response: []string{`
{
"pps": {
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071"
},
"ts": "2017-08-17T14:54:12.949180-07:00",
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<[email protected]> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent",
"sm": {
"tls": {
"verify": "NONE"
},
"stat": "Sent",
"qid": "v7HLqYbx029423",
"dsn": "2.0.0",
"mailer": "*file*",
"to": [
"/dev/null"
],
"ctladdr": "<[email protected]> (8/0)",
"delay": "00:00:00",
"xdelay": "00:00:00",
"pri": 35342
},
"id": "ZeYGULpZmL5N0151HN1OyA"
}`},
want: []map[string]interface{}{
{
"pps": map[string]interface{}{
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071",
},
"ts": "2017-08-17T14:54:12.949180-07:00",
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<[email protected]> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent",
"sm": map[string]interface{}{
"tls": map[string]interface{}{
"verify": "NONE",
},
"stat": "Sent",
"qid": "v7HLqYbx029423",
"dsn": "2.0.0",
"mailer": "*file*",
"to": []interface{}{
"/dev/null",
},
"ctladdr": "<[email protected]> (8/0)",
"delay": "00:00:00",
"xdelay": "00:00:00",
"pri": float64(35342),
},
"id": "ZeYGULpZmL5N0151HN1OyA",
},
},
},
{
name: "basic_proxy_forwarding",
proxyServer: newWebSocketProxyTestServer,
handler: defaultHandler,
config: map[string]interface{}{
"program": `
bytes(state.response).decode_json().as(inner_body,{
"events": [inner_body],
})`,
},
response: []string{`
{
"pps": {
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071"
},
"ts": "2017-08-17T14:54:12.949180-07:00",
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<[email protected]> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent",
"sm": {
"tls": {
"verify": "NONE"
},
"stat": "Sent",
"qid": "v7HLqYbx029423",
"dsn": "2.0.0",
"mailer": "*file*",
"to": [
"/dev/null"
],
"ctladdr": "<[email protected]> (8/0)",
"delay": "00:00:00",
"xdelay": "00:00:00",
"pri": 35342
},
"id": "ZeYGULpZmL5N0151HN1OyA"
}`},
want: []map[string]interface{}{
{
"pps": map[string]interface{}{
"agent": "example.proofpoint.com",
"cid": "mmeng_uivm071",
},
"ts": "2017-08-17T14:54:12.949180-07:00",
"data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr=<[email protected]> (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent",
"sm": map[string]interface{}{
"tls": map[string]interface{}{
"verify": "NONE",
},
"stat": "Sent",
"qid": "v7HLqYbx029423",
"dsn": "2.0.0",
"mailer": "*file*",
"to": []interface{}{
"/dev/null",
},
"ctladdr": "<[email protected]> (8/0)",
"delay": "00:00:00",
"xdelay": "00:00:00",
"pri": float64(35342),
},
"id": "ZeYGULpZmL5N0151HN1OyA",
},
},
},
}

var urlEvalTests = []struct {
Expand Down Expand Up @@ -560,6 +696,9 @@ func TestInput(t *testing.T) {
if test.server != nil {
test.server(t, test.handler, test.config, test.response)
}
if test.proxyServer != nil {
test.proxyServer(t, test.handler, test.config, test.response)
}

cfg := conf.MustNewConfigFrom(test.config)

Expand Down Expand Up @@ -771,6 +910,46 @@ func webSocketServerWithRetry(serve func(http.Handler) *httptest.Server) func(*t
}
}

// webSocketServerWithTLS simulates a WebSocket server with TLS based authentication.
func webSocketServerWithTLS(serve func(http.Handler) *httptest.Server) func(*testing.T, WebSocketHandler, map[string]interface{}, []string) {
return func(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) {
server := serve(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
t.Fatalf("error upgrading connection to WebSocket: %v", err)
return
}

handler(t, conn, response)
}))
//nolint:gosec // there is no need to use a secure cert for testing
server.TLS = &tls.Config{
Certificates: []tls.Certificate{generateSelfSignedCert(t)},
}
server.StartTLS()

if config["url"] == nil {
config["url"] = "ws" + server.URL[4:]
}
t.Cleanup(server.Close)
}
}

// generateSelfSignedCert returns a self-signed certificate for testing purposes based on the dummy certs in the testdata directory
func generateSelfSignedCert(t *testing.T) tls.Certificate {
cert, err := tls.LoadX509KeyPair("testdata/certs/cert.pem", "testdata/certs/key.pem")
if err != nil {
t.Fatalf("failed to generate self-signed cert: %v", err)
}
return cert
}

// defaultHandler is a default handler for WebSocket connections.
func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) {
for _, r := range response {
Expand All @@ -780,3 +959,73 @@ func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) {
}
}
}

// webSocketTestServer creates a WebSocket target server that communicates with the proxy handler.
func webSocketTestServer(t *testing.T, handler WebSocketHandler, response []string) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
t.Fatalf("failed to upgrade WebSocket connection: %v", err)
return
}
handler(t, conn, response)
}))
}

// webSocketProxyHandler forwards WebSocket connections to the target server.
//
//nolint:errcheck //we can safely ignore errors checks here
func webSocketProxyHandler(targetURL string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Response.Body.Close()
//nolint:bodyclose // we can ignore the body close here
targetConn, _, err := websocket.DefaultDialer.Dial(targetURL, nil)
if err != nil {
http.Error(w, "failed to connect to backend WebSocket server", http.StatusBadGateway)
return
}
defer targetConn.Close()

upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
clientConn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
http.Error(w, "failed to upgrade client connection", http.StatusInternalServerError)
return
}
defer clientConn.Close()
// forward messages between client and target server
go func() {
for {
messageType, message, err := targetConn.ReadMessage()
if err != nil {
break
}
clientConn.WriteMessage(messageType, message)
}
}()
for {
messageType, message, err := clientConn.ReadMessage()
if err != nil {
break
}
targetConn.WriteMessage(messageType, message)
}
}
}

// newWebSocketProxyTestServer creates a proxy server forwarding WebSocket traffic.
func newWebSocketProxyTestServer(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) *httptest.Server {
backendServer := webSocketTestServer(t, handler, response)
config["url"] = "ws" + backendServer.URL[4:]
config["proxy_url"] = "ws" + backendServer.URL[4:]
return httptest.NewServer(webSocketProxyHandler(config["url"].(string)))
}
21 changes: 21 additions & 0 deletions x-pack/filebeat/input/streaming/testdata/certs/ca.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-----BEGIN CERTIFICATE-----
MIIDbTCCAlWgAwIBAgIUS/rm8sWDc2a+eD9L+q+9XQpBa5MwDQYJKoZIhvcNAQEL
BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAgFw0yNDEyMDUxMjM4NThaGA8yMTI0
MTExMTEyMzg1OFowRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx
ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcN
AQEBBQADggEPADCCAQoCggEBALhEaFVqFuYwSwH4GHhMeqhilC+sWXKaQP8QmaH7
HWRST8Ko6YTT9NixUL4Qs5OmzCQFavRN9qtEo4wtqCJBOEyXQG1wAHuLWIY+KOCB
twUg8fP+uYaYUOQOYNLkBz7SLlejuZYTyGxepIkc+UeJRcOE36anIPHpc2KSr3Hm
vKJxZUVpQEbJvQ7pe7+iLL4jSOfzpQNcV9S/bzTo6taZXuo+ryEPlshkU/ME5VCN
LFrU3AW2fzKW0Xa/skkW5izCiAU8KNEy84UQM6aZkJfFi9O394i97sGgYg+q36XL
sXbZ+sCXHI3CGx+pwOx0h7S8n7iJJ7BbmwM6QuLFF6bFYkkCAwEAAaNTMFEwHQYD
VR0OBBYEFEHtfvey8SdncMr7VDqA2YhtEiGYMB8GA1UdIwQYMBaAFEHtfvey8Sdn
cMr7VDqA2YhtEiGYMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEB
AKFcAeh9yhIbkkxvXl6ebmLcj817NVjhpcvDZlKP2MVu+w/h70w+JwBktLUlZlXd
UNkKEWZyRvrdmY+YN6rwY/QI75N17bcmDWy6QnNlVJF0AJkBSdbKonCgHrZm7K25
TOKpj0QF8l7k9wr5FWHHcBw/vFF9cGZ5TO4HbnI25N/cEKgdzZFEVA5Y/Rv7GIGU
COjJG20Cr2HIKvVYoyWvN6sL7+gbzUMyjvQyGMCT7YoIqscUfrUU+T46QaOLAKa3
z91Obfmv6uTO/rsieoxVWVJ35GeHeNJkAPkr7Z1sWIrreJ/3WsecWuPPEDNDXiSV
5h0bTbbPOyEIe5ydEIbr5kA=
-----END CERTIFICATE-----
Loading
Loading