diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index bb6c4df0f627..dd4f19be3350 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -360,6 +360,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add evaluation state dump debugging option to CEL input. {pull}41335[41335] - Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862] - Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869] +- Add support for SSL and Proxy configurations for websoket type in streaming input. {pull}41934[41934] - AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694] - The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932] diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index b80deda9c778..9a6f67e5bc49 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -349,6 +349,28 @@ The minimum time to wait between retries. This ensures that retries are spaced o The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. +[float] +=== `timeout` +Timeout is the maximum amount of time the websocket dialer will wait for a connection to be established. The default value is `180` seconds. + +[float] +==== `proxy_url` +This specifies the forward proxy URL to use for the connection. The `proxy_url` configuration is optional and can be used to configure the proxy settings for the connection. The `proxy_url` default value is set by `http.ProxyFromEnvironment` which reads the `HTTP_PROXY`, `HTTPS_PROXY`, and `NO_PROXY` environment variables. + +[float] +==== `proxy_headers` +This specifies the headers to be sent to the proxy server. The `proxy_headers` configuration is optional and can be used to configure the headers to be sent to the proxy server. + +[float] +==== `ssl` +This specifies the SSL configuration for the connection. The `ssl` configuration is optional and can be used to configure the SSL settings for the connection. The `ssl` configuration has the following subfields: + + - `certificate_authorities`: A list of root certificates to use for verifying the server's certificate. + - `certificate`: The (PEM encoded) certificate to use for client authentication. + - `key`: The (PEM encoded) private key to use for client authentication. + +If this is a self-signed certificate, the `certificate_authorities` field should be set to the certificate itself. + [float] === Metrics diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index 67ee6e1eb318..6ccaf0c73493 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -41,9 +41,8 @@ type config struct { Redact *redact `config:"redact"` // Retry is the configuration for retrying failed connections. Retry *retry `config:"retry"` - + // Transport is the common the transport config. Transport httpcommon.HTTPTransportSettings `config:",inline"` - // CrowdstrikeAppID is the value used to set the // appId request parameter in the FalconHose stream // discovery request. @@ -166,3 +165,11 @@ func checkURLScheme(c config) error { return fmt.Errorf("unknown stream type: %s", c.Type) } } + +func defaultConfig() config { + return config{ + Transport: httpcommon.HTTPTransportSettings{ + Timeout: 180 * time.Second, + }, + } +} diff --git a/x-pack/filebeat/input/streaming/input_manager.go b/x-pack/filebeat/input/streaming/input_manager.go index f20b867755b2..c685452c34f1 100644 --- a/x-pack/filebeat/input/streaming/input_manager.go +++ b/x-pack/filebeat/input/streaming/input_manager.go @@ -34,7 +34,7 @@ func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManage } func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, error) { - src := &source{cfg: config{}} + src := &source{cfg: defaultConfig()} if err := cfg.Unpack(&src.cfg); err != nil { return nil, nil, err } diff --git a/x-pack/filebeat/input/streaming/input_test.go b/x-pack/filebeat/input/streaming/input_test.go index 6d382bdf6645..c11784ea3dbf 100644 --- a/x-pack/filebeat/input/streaming/input_test.go +++ b/x-pack/filebeat/input/streaming/input_test.go @@ -6,6 +6,7 @@ package streaming import ( "context" + "crypto/tls" "errors" "fmt" "net/http" @@ -41,6 +42,7 @@ type WebSocketHandler func(*testing.T, *websocket.Conn, []string) var inputTests = []struct { name string server func(*testing.T, WebSocketHandler, map[string]interface{}, []string) + proxyServer func(*testing.T, WebSocketHandler, map[string]interface{}, []string) *httptest.Server handler WebSocketHandler config map[string]interface{} response []string @@ -450,6 +452,140 @@ var inputTests = []struct { }, wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"), }, + { + name: "single_event_tls", + server: webSocketServerWithTLS(httptest.NewUnstartedServer), + handler: defaultHandler, + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + "ssl": map[string]interface{}{ + "enabled": true, + "certificate_authorities": []string{"testdata/certs/ca.crt"}, + "certificate": "testdata/certs/cert.pem", + "key": "testdata/certs/key.pem", + }, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": { + "tls": { + "verify": "NONE" + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": [ + "/dev/null" + ], + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": 35342 + }, + "id": "ZeYGULpZmL5N0151HN1OyA" + }`}, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": map[string]interface{}{ + "tls": map[string]interface{}{ + "verify": "NONE", + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": []interface{}{ + "/dev/null", + }, + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": float64(35342), + }, + "id": "ZeYGULpZmL5N0151HN1OyA", + }, + }, + }, + { + name: "basic_proxy_forwarding", + proxyServer: newWebSocketProxyTestServer, + handler: defaultHandler, + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": { + "tls": { + "verify": "NONE" + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": [ + "/dev/null" + ], + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": 35342 + }, + "id": "ZeYGULpZmL5N0151HN1OyA" + }`}, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": map[string]interface{}{ + "tls": map[string]interface{}{ + "verify": "NONE", + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": []interface{}{ + "/dev/null", + }, + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": float64(35342), + }, + "id": "ZeYGULpZmL5N0151HN1OyA", + }, + }, + }, } var urlEvalTests = []struct { @@ -560,6 +696,9 @@ func TestInput(t *testing.T) { if test.server != nil { test.server(t, test.handler, test.config, test.response) } + if test.proxyServer != nil { + test.proxyServer(t, test.handler, test.config, test.response) + } cfg := conf.MustNewConfigFrom(test.config) @@ -771,6 +910,46 @@ func webSocketServerWithRetry(serve func(http.Handler) *httptest.Server) func(*t } } +// webSocketServerWithTLS simulates a WebSocket server with TLS based authentication. +func webSocketServerWithTLS(serve func(http.Handler) *httptest.Server) func(*testing.T, WebSocketHandler, map[string]interface{}, []string) { + return func(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) { + server := serve(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("error upgrading connection to WebSocket: %v", err) + return + } + + handler(t, conn, response) + })) + //nolint:gosec // there is no need to use a secure cert for testing + server.TLS = &tls.Config{ + Certificates: []tls.Certificate{generateSelfSignedCert(t)}, + } + server.StartTLS() + + if config["url"] == nil { + config["url"] = "ws" + server.URL[4:] + } + t.Cleanup(server.Close) + } +} + +// generateSelfSignedCert returns a self-signed certificate for testing purposes based on the dummy certs in the testdata directory +func generateSelfSignedCert(t *testing.T) tls.Certificate { + cert, err := tls.LoadX509KeyPair("testdata/certs/cert.pem", "testdata/certs/key.pem") + if err != nil { + t.Fatalf("failed to generate self-signed cert: %v", err) + } + return cert +} + // defaultHandler is a default handler for WebSocket connections. func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) { for _, r := range response { @@ -780,3 +959,73 @@ func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) { } } } + +// webSocketTestServer creates a WebSocket target server that communicates with the proxy handler. +func webSocketTestServer(t *testing.T, handler WebSocketHandler, response []string) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("failed to upgrade WebSocket connection: %v", err) + return + } + handler(t, conn, response) + })) +} + +// webSocketProxyHandler forwards WebSocket connections to the target server. +// +//nolint:errcheck //we can safely ignore errors checks here +func webSocketProxyHandler(targetURL string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + defer r.Response.Body.Close() + //nolint:bodyclose // we can ignore the body close here + targetConn, _, err := websocket.DefaultDialer.Dial(targetURL, nil) + if err != nil { + http.Error(w, "failed to connect to backend WebSocket server", http.StatusBadGateway) + return + } + defer targetConn.Close() + + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + clientConn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + http.Error(w, "failed to upgrade client connection", http.StatusInternalServerError) + return + } + defer clientConn.Close() + // forward messages between client and target server + go func() { + for { + messageType, message, err := targetConn.ReadMessage() + if err != nil { + break + } + clientConn.WriteMessage(messageType, message) + } + }() + for { + messageType, message, err := clientConn.ReadMessage() + if err != nil { + break + } + targetConn.WriteMessage(messageType, message) + } + } +} + +// newWebSocketProxyTestServer creates a proxy server forwarding WebSocket traffic. +func newWebSocketProxyTestServer(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) *httptest.Server { + backendServer := webSocketTestServer(t, handler, response) + config["url"] = "ws" + backendServer.URL[4:] + config["proxy_url"] = "ws" + backendServer.URL[4:] + return httptest.NewServer(webSocketProxyHandler(config["url"].(string))) +} diff --git a/x-pack/filebeat/input/streaming/testdata/certs/ca.crt b/x-pack/filebeat/input/streaming/testdata/certs/ca.crt new file mode 100644 index 000000000000..43e187f1367f --- /dev/null +++ b/x-pack/filebeat/input/streaming/testdata/certs/ca.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDbTCCAlWgAwIBAgIUS/rm8sWDc2a+eD9L+q+9XQpBa5MwDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAgFw0yNDEyMDUxMjM4NThaGA8yMTI0 +MTExMTEyMzg1OFowRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx +ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcN +AQEBBQADggEPADCCAQoCggEBALhEaFVqFuYwSwH4GHhMeqhilC+sWXKaQP8QmaH7 +HWRST8Ko6YTT9NixUL4Qs5OmzCQFavRN9qtEo4wtqCJBOEyXQG1wAHuLWIY+KOCB +twUg8fP+uYaYUOQOYNLkBz7SLlejuZYTyGxepIkc+UeJRcOE36anIPHpc2KSr3Hm +vKJxZUVpQEbJvQ7pe7+iLL4jSOfzpQNcV9S/bzTo6taZXuo+ryEPlshkU/ME5VCN +LFrU3AW2fzKW0Xa/skkW5izCiAU8KNEy84UQM6aZkJfFi9O394i97sGgYg+q36XL +sXbZ+sCXHI3CGx+pwOx0h7S8n7iJJ7BbmwM6QuLFF6bFYkkCAwEAAaNTMFEwHQYD +VR0OBBYEFEHtfvey8SdncMr7VDqA2YhtEiGYMB8GA1UdIwQYMBaAFEHtfvey8Sdn +cMr7VDqA2YhtEiGYMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEB +AKFcAeh9yhIbkkxvXl6ebmLcj817NVjhpcvDZlKP2MVu+w/h70w+JwBktLUlZlXd +UNkKEWZyRvrdmY+YN6rwY/QI75N17bcmDWy6QnNlVJF0AJkBSdbKonCgHrZm7K25 +TOKpj0QF8l7k9wr5FWHHcBw/vFF9cGZ5TO4HbnI25N/cEKgdzZFEVA5Y/Rv7GIGU +COjJG20Cr2HIKvVYoyWvN6sL7+gbzUMyjvQyGMCT7YoIqscUfrUU+T46QaOLAKa3 +z91Obfmv6uTO/rsieoxVWVJ35GeHeNJkAPkr7Z1sWIrreJ/3WsecWuPPEDNDXiSV +5h0bTbbPOyEIe5ydEIbr5kA= +-----END CERTIFICATE----- diff --git a/x-pack/filebeat/input/streaming/testdata/certs/cert.pem b/x-pack/filebeat/input/streaming/testdata/certs/cert.pem new file mode 100644 index 000000000000..43e187f1367f --- /dev/null +++ b/x-pack/filebeat/input/streaming/testdata/certs/cert.pem @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDbTCCAlWgAwIBAgIUS/rm8sWDc2a+eD9L+q+9XQpBa5MwDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAgFw0yNDEyMDUxMjM4NThaGA8yMTI0 +MTExMTEyMzg1OFowRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx +ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDCCASIwDQYJKoZIhvcN +AQEBBQADggEPADCCAQoCggEBALhEaFVqFuYwSwH4GHhMeqhilC+sWXKaQP8QmaH7 +HWRST8Ko6YTT9NixUL4Qs5OmzCQFavRN9qtEo4wtqCJBOEyXQG1wAHuLWIY+KOCB +twUg8fP+uYaYUOQOYNLkBz7SLlejuZYTyGxepIkc+UeJRcOE36anIPHpc2KSr3Hm +vKJxZUVpQEbJvQ7pe7+iLL4jSOfzpQNcV9S/bzTo6taZXuo+ryEPlshkU/ME5VCN +LFrU3AW2fzKW0Xa/skkW5izCiAU8KNEy84UQM6aZkJfFi9O394i97sGgYg+q36XL +sXbZ+sCXHI3CGx+pwOx0h7S8n7iJJ7BbmwM6QuLFF6bFYkkCAwEAAaNTMFEwHQYD +VR0OBBYEFEHtfvey8SdncMr7VDqA2YhtEiGYMB8GA1UdIwQYMBaAFEHtfvey8Sdn +cMr7VDqA2YhtEiGYMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEB +AKFcAeh9yhIbkkxvXl6ebmLcj817NVjhpcvDZlKP2MVu+w/h70w+JwBktLUlZlXd +UNkKEWZyRvrdmY+YN6rwY/QI75N17bcmDWy6QnNlVJF0AJkBSdbKonCgHrZm7K25 +TOKpj0QF8l7k9wr5FWHHcBw/vFF9cGZ5TO4HbnI25N/cEKgdzZFEVA5Y/Rv7GIGU +COjJG20Cr2HIKvVYoyWvN6sL7+gbzUMyjvQyGMCT7YoIqscUfrUU+T46QaOLAKa3 +z91Obfmv6uTO/rsieoxVWVJ35GeHeNJkAPkr7Z1sWIrreJ/3WsecWuPPEDNDXiSV +5h0bTbbPOyEIe5ydEIbr5kA= +-----END CERTIFICATE----- diff --git a/x-pack/filebeat/input/streaming/testdata/certs/key.pem b/x-pack/filebeat/input/streaming/testdata/certs/key.pem new file mode 100644 index 000000000000..1b6ce6bade3c --- /dev/null +++ b/x-pack/filebeat/input/streaming/testdata/certs/key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC4RGhVahbmMEsB ++Bh4THqoYpQvrFlymkD/EJmh+x1kUk/CqOmE0/TYsVC+ELOTpswkBWr0TfarRKOM +LagiQThMl0BtcAB7i1iGPijggbcFIPHz/rmGmFDkDmDS5Ac+0i5Xo7mWE8hsXqSJ +HPlHiUXDhN+mpyDx6XNikq9x5ryicWVFaUBGyb0O6Xu/oiy+I0jn86UDXFfUv280 +6OrWmV7qPq8hD5bIZFPzBOVQjSxa1NwFtn8yltF2v7JJFuYswogFPCjRMvOFEDOm +mZCXxYvTt/eIve7BoGIPqt+ly7F22frAlxyNwhsfqcDsdIe0vJ+4iSewW5sDOkLi +xRemxWJJAgMBAAECggEAQprvf5hWaKQiKLcN2UYDvCPN3qGUv3kEb24HqmZDjIS4 +MeuuZQXcZgtJ3TnaP0+2UHro2x/nPqcT2tKSCLe8aurtLeGjOwT2XafQTL52clMj +Qgfb9cvOyXBtDS3BdLKyb5lNtvK1qn5XSPyBGpuC7RZ1ZR7aKLcyrvnIkwpNOwXW +zH5F6pI6HAUPfgYcHfIkQ5kuPCRcvfmv6m9XLYlmiQNkReQ2fWtFF6517R6FGtZu +Z8F0pFz8VtIGQoamX9vEwQhYqBK67msl9gnKjyH3ONckkSRMVagXrwx9F5o+NeRD +IgDFnjH1HgLCXmeCa7BN+eYfGMZ24xisItD7XBzGtQKBgQDlmEncIgpkmtXZSvXs +r5i7epJDDcxC8/ZObsn3zI01t4nmI9+phu7a4fAA+AUP7+HFVdi22JcHDkHZ5J1a +93t+Tcc4yzXk8FovaavRRvJNXv3WhHvgNpe1tvgyMUc8p9QpfbHuafVhQN2qWivh +nnEWagBoguiXaXEIFRXFK6dt5wKBgQDNdZYQ4/Am7HNjI2vqmMccZxuSufUX0xxM +LuDY8UAsPbgRN8wqfY67xCdMztax5y15gF9UPw0hHMlk4m2J4lsqCqgRRWXnTser +rNAseZ1j7MZY0cKACRxvXNtPKmmHKvYHUEZkADT/HrhfZWIce7KEXaxdoGTkssPd +9/WbahLITwKBgFBa2VbTDyIg0sGHK8UXu/O5tWEEfj3clpLi0YsJq05mmzvRyGDT +2dr/gnlEVLk8Mp9XKU7tRQZyJff1vGDvBuiwng4xiP5EZLv9VuYa14jeuyaOHbDe +SoCNthYTCySedHHFDTYtHXVZN3t8raj8RAYdOWFal78OZ0H15zWnzqR3AoGBAJne +mxFxM3RjFpNDftmFq3BpA6xiGdzK3OFtJjUykAXR/xzd9chImfGjGG+cZAt9/3+E +FWCpi7KltWoZbUGbRPz6WB3/JC8Tv9OhK5JzTdz9ARqZlRmAOUxpdVEXiUqScQjP +JLhVs1rw7dF7wvtj5DDfWmwP6B+iha+huM24pfJfAoGAUYeJQ1XqEx+0+b6xrtCm +qxPiiGnJSi4J9CGaZRMTG5qFSQ5jCaWTJSWdiZJgyBMnmQsiuPacefAg91z/NjJC +xM0/sKLe/yPWP1UlwrCl1MDjMwIl/qtWiKXYXpY1qcOONLCFc6OASd1cTOtX/7Km +2g49JWZEY71f7DxLdcWfqWM= +-----END PRIVATE KEY----- diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 0c8de94f5ad3..8a78757f0e96 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -7,6 +7,7 @@ package streaming import ( "bytes" "context" + "crypto/tls" "errors" "fmt" "io" @@ -14,6 +15,7 @@ import ( "math/rand/v2" "net" "net/http" + "net/url" "strings" "time" @@ -22,6 +24,8 @@ import ( inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/transport/httpcommon" + "github.com/elastic/elastic-agent-libs/transport/tlscommon" ) type websocketStream struct { @@ -220,14 +224,18 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log var response *http.Response var err error headers := formHeader(cfg) - + dialer, err := createWebSocketDialer(cfg) + if err != nil { + return nil, nil, err + } if cfg.Retry != nil { retryConfig := cfg.Retry for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { - conn, response, err = websocket.DefaultDialer.DialContext(ctx, url, headers) + conn, response, err = dialer.DialContext(ctx, url, headers) if err == nil { return conn, response, nil } + //nolint:errorlint // it will never be a wrapped error at this point if err == websocket.ErrBadHandshake { log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) continue @@ -239,7 +247,7 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", retryConfig.MaxAttempts, err) } - return websocket.DefaultDialer.DialContext(ctx, url, headers) + return dialer.DialContext(ctx, url, headers) } // calculateWaitTime calculates the wait time for the next attempt based on the exponential backoff algorithm. @@ -269,3 +277,42 @@ func (s *websocketStream) Close() error { s.metrics.Close() return nil } + +func createWebSocketDialer(cfg config) (*websocket.Dialer, error) { + var tlsConfig *tls.Config + dialer := &websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + } + + // load proxy configuration if available + if cfg.Transport.Proxy.URL != nil { + var proxy func(*http.Request) (*url.URL, error) + proxyURL, err := httpcommon.NewProxyURIFromString(cfg.Transport.Proxy.URL.String()) + if err != nil { + return nil, fmt.Errorf("failed to parse proxy URL: %w", err) + } + // create a custom HTTP Transport with proxy configuration + proxyTransport := &http.Transport{ + Proxy: http.ProxyURL(proxyURL.URI()), + ProxyConnectHeader: cfg.Transport.Proxy.Headers.Headers(), + DialContext: (&net.Dialer{ + Timeout: cfg.Transport.Timeout, + }).DialContext, + } + dialer.NetDialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { + return proxyTransport.DialContext(ctx, network, addr) + } + dialer.Proxy = proxy + } + // load TLS config if available + if cfg.Transport.TLS != nil { + TLSConfig, err := tlscommon.LoadTLSConfig(cfg.Transport.TLS) + if err != nil { + return nil, fmt.Errorf("failed to load TLS config: %w", err) + } + tlsConfig = TLSConfig.ToConfig() + dialer.TLSClientConfig = tlsConfig + } + + return dialer, nil +}