Skip to content

Commit

Permalink
[filebeat][input] - Websocket Input with CEL engine (#37774)
Browse files Browse the repository at this point in the history
* initial commit

* working version of websocket input with cel engine

* updated go mod and NOTICE

* added regex support to cel engine, added all metrics params to relevant code blocks

* removed unused config from input struct

* added redactor

* removed cel references in logs

* addressed draft PR suggestions and added more metrics

* added tests

* added retry function

* updated changelog

* updated tests

* addressed PR suggestions, removed auto retry mechanism for the moment

* addressed PR suggestions, removed auto retry mechanism for the moment

* added documentation and updated codeowners

* updated experimental tags

* added a new test, cleaned up some code and logic

* added cursor condition check test and updated filebeat-options asciidoc

* added auth tests, removed api-key config and updated it to custom auth to be more generic, added context in Dial method

* addressed PR suggestions and added config tests

* Update x-pack/filebeat/docs/inputs/input-websocket.asciidoc

Co-authored-by: Dan Kortschak <[email protected]>

* Update x-pack/filebeat/docs/inputs/input-websocket.asciidoc

Co-authored-by: Dan Kortschak <[email protected]>

* Update x-pack/filebeat/docs/inputs/input-websocket.asciidoc

Co-authored-by: Dan Kortschak <[email protected]>

* added debug log for cel state before cel eval, updated the docs accordingly

* updated URL config docs

---------

Co-authored-by: Dan Kortschak <[email protected]>
  • Loading branch information
ShourieG and efd6 authored Feb 12, 2024
1 parent 3345106 commit e5000b6
Show file tree
Hide file tree
Showing 16 changed files with 1,998 additions and 33 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ CHANGELOG*
/x-pack/filebeat/input/lumberjack/ @elastic/security-service-integrations
/x-pack/filebeat/input/netflow/ @elastic/sec-deployment-and-devices
/x-pack/filebeat/input/o365audit/ @elastic/security-service-integrations
/x-pack/filebeat/input/websocket/ @elastic/security-service-integrations
/x-pack/filebeat/module/activemq @elastic/obs-infraobs-integrations
/x-pack/filebeat/module/aws @elastic/obs-cloud-monitoring
/x-pack/filebeat/module/awsfargate @elastic/obs-cloud-monitoring
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Relax TCP/UDP metric polling expectations to improve metric collection. {pull}37714[37714]
- Add support for PEM-based Okta auth in HTTPJSON. {pull}37772[37772]
- Prevent complete loss of long request trace data. {issue}37826[37826] {pull}37836[37836]
- Added experimental version of the Websocket Input. {pull}37774[37774]
- Add support for PEM-based Okta auth in CEL. {pull}37813[37813]

*Auditbeat*
Expand Down
64 changes: 32 additions & 32 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18447,6 +18447,38 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


--------------------------------------------------------------------------------
Dependency : github.com/gorilla/websocket
Version: v1.4.2
Licence type (autodetected): BSD-2-Clause
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/gorilla/[email protected]/LICENSE:

Copyright (c) 2013 The Gorilla WebSocket Authors. All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


--------------------------------------------------------------------------------
Dependency : github.com/h2non/filetype
Version: v1.1.1
Expand Down Expand Up @@ -41594,38 +41626,6 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


--------------------------------------------------------------------------------
Dependency : github.com/gorilla/websocket
Version: v1.4.2
Licence type (autodetected): BSD-2-Clause
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/gorilla/[email protected]/LICENSE:

Copyright (c) 2013 The Gorilla WebSocket Authors. All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


--------------------------------------------------------------------------------
Dependency : github.com/hashicorp/cronexpr
Version: v1.1.0
Expand Down
3 changes: 3 additions & 0 deletions filebeat/docs/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ You can configure {beatname_uc} to use the following inputs:
* <<{beatname_lc}-input-tcp>>
* <<{beatname_lc}-input-udp>>
* <<{beatname_lc}-input-gcs>>
* <<{beatname_lc}-input-websocket>>

include::multiline.asciidoc[]

Expand Down Expand Up @@ -145,3 +146,5 @@ include::inputs/input-udp.asciidoc[]
include::inputs/input-unix.asciidoc[]

include::../../x-pack/filebeat/docs/inputs/input-gcs.asciidoc[]

include::../../x-pack/filebeat/docs/inputs/input-websocket.asciidoc[]
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ require (
github.com/googleapis/gax-go/v2 v2.12.0
github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/icholy/digest v0.1.22
github.com/lestrrat-go/jwx/v2 v2.0.19
github.com/otiai10/copy v1.12.0
Expand Down Expand Up @@ -299,7 +300,6 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect
github.com/googleapis/gnostic v0.5.5 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/cronexpr v1.1.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down
272 changes: 272 additions & 0 deletions x-pack/filebeat/docs/inputs/input-websocket.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
[role="xpack"]

:type: websocket
:mito_version: v1.8.0
:mito_docs: https://pkg.go.dev/github.com/elastic/mito@{mito_version}

[id="{beatname_lc}-input-{type}"]
=== Websocket Input
experimental[]

The `websocket` input reads messages from a websocket server or api endpoint. This input uses the `CEL engine` and the `mito` library interally to parse and process the messages. Having support for `CEL` allows you to parse and process the messages in a more flexible way. It has many similarities with the `cel` input as to how the `CEL` programs are written but deviates in the way the messages are read and processed. The `websocket` input is a `streaming` input and can only be used to read messages from a websocket server or api endpoint.

This input supports:

* Auth
** Basic
** Bearer
** Custom

NOTE: The `websocket` input as of now does not support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.

==== Execution

The execution environment provided for the input includes includes the functions, macros, and global variables provided by the mito library.
A single JSON object is provided as an input accessible through a `state` variable. `state` contains a `response` map field and may contain arbitrary other fields configured via the input's `state` configuration. If the CEL program saves cursor states between executions of the program, the configured `state.cursor` value will be replaced by the saved cursor prior to execution.

On start the `state` will be something like this:

["source","json",subs="attributes"]
----
{
"response": { ... },
"cursor": { ... },
...
}
----
The `websocket` input creates a `response` field in the state map and attaches the websocket message to this field. All `CEL` programs written should act on this `response` field. Additional fields may be present at the root of the object and if the program tolerates it, the cursor value may be absent. Only the cursor is persisted over restarts, but all fields in state are retained between iterations of the processing loop except for the produced events array, see below.

If the cursor is present the program should process or filter out responses based on its value. If cursor is not present all responses should be processed as per the program's logic.

After completion of a program's execution it should return a single object with a structure looking like this:

["source","json",subs="attributes"]
----
{
"events": [ <1>
{...},
...
],
"cursor": [ <2>
{...},
...
]
}
----

<1> The `events` field must be present, but may be empty or null. If it is not empty, it must only have objects as elements.
The field could be an array or a single object that will be treated as an array with a single element. This depends completely on the websocket server or api endpoint. The `events` field is the array of events to be published to the output. Each event must be a JSON object.

<2> If `cursor` is present it must be either be a single object or an array with the same length as events; each element _i_ of the `cursor` will be the details for obtaining the events at and beyond event _i_ in the `events` array. If the `cursor` is a single object, it will be the details for obtaining events after the last event in the `events` array and will only be retained on successful publication of all the events in the `events` array.


Example configuration:

["source","yaml",subs="attributes"]
----
filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: websocket
url: ws://localhost:443/v1/stream
program: |
bytes(state.response).decode_json().as(inner_body,{
"events": {
"message": inner_body.encode_json(),
}
})
----

==== Debug state logging

The Websocket input will log the complete state when logging at the DEBUG level before and after CEL evaluation.
This will include any sensitive or secret information kept in the `state` object, and so DEBUG level logging should not be used in production when sensitive information is retained in the `state` object. See <<websocket-state-redact,`redact`>> configuration parameters for settings to exclude sensitive fields from DEBUG logs.

==== Authentication
The Websocket input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively.

Example configurations with authentication:

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
auth.basic_token: "dXNlcjpwYXNzd29yZA=="
url: wss://localhost:443/_stream
----

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
auth.bearer_token: "dXNlcjpwYXNzd29yZA=="
url: wss://localhost:443/_stream
----

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
auth.custom:
header: "x-api-key"
value: "dXNlcjpwYXNzd29yZA=="
url: wss://localhost:443/_stream
----

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
auth.custom:
header: "Auth"
value: "Bearer dXNlcjpwYXNzd29yZA=="
url: wss://localhost:443/_stream
----

[[input-state-websocket]]
==== Input state

The `websocket` input keeps a runtime state between every message received. This state can be accessed by the CEL program and may contain arbitrary objects.
The state must contain a `response` map and may contain any object the user wishes to store in it. All objects are stored at runtime, except `cursor`, which has values that are persisted between restarts.

==== Configuration options

The `websocket` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.

[[program-websocket]]
[float]
==== `program`

The CEL program that is executed on each message received. This field should ideally be present but if not the default program given below is used.

["source","yaml",subs="attributes"]
----
program: |
bytes(state.response).decode_json().as(inner_body,{
"events": {
"message": inner_body.encode_json(),
}
})
----

[[state-websocket]]
[float]
==== `state`

`state` is an optional object that is passed to the CEL program on the first execution. It is available to the executing program as the `state` variable. Except for the `state.cursor` field, `state` does not persist over restarts.

[[cursor-websocket]]
[float]
==== `state.cursor`

The cursor is an object available as `state.cursor` where arbitrary values may be stored. Cursor state is kept between input restarts and updated after each event of a request has been published. When a cursor is used the CEL program must either create a cursor state for each event that is returned by the program, or a single cursor that reflects the cursor for completion of the full set of events.

["source","yaml",subs="attributes"]
----
filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
- type: websocket
url: ws://localhost:443/v1/stream
program: |
bytes(state.response).as(body, {
"events": [body.decode_json().with({
"last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ?
state.cursor.last_requested_at
:
now
})],
"cursor": {"last_requested_at": now}
})
----

[[regexp-websocket]]
[float]
==== `regexp`

A set of named regular expressions that may be used during a CEL program's execution using the `regexp` extension library. The syntax used for the regular expressions is https://github.com/google/re2/wiki/Syntax[RE2].

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
# Define two regular expressions, 'products' and 'solutions' for use during CEL program execution.
regexp:
products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)'
solutions: '(?i)(Search|Observability|Security)'
----

[[websocket-state-redact]]
[float]
==== `redact`

During debug level logging, the `state` object and the resulting evaluation result are included in logs. This may result in leaking of secrets. In order to prevent this, fields may be redacted or deleted from the logged `state`. The `redact` configuration allows users to configure this field redaction behaviour. For safety reasons if the `redact` configuration is missing a warning is logged.

In the case of no-required redaction an empty `redact.fields` configuration should be used to silence the logged warning.

["source","yaml",subs="attributes"]
----
- type: websocket
redact:
fields: ~
----

As an example, if a user-constructed Basic Authentication request is used in a CEL program the password can be redacted like so

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: websocket
url: ws://localhost:443/_stream
state:
user: [email protected]
password: P@$$W0₹D
redact:
fields:
- password
delete: true
----

Note that fields under the `auth` configuration hierarchy are not exposed to the `state` and so do not need to be redacted. For this reason it is preferable to use these for authentication over the request construction shown above where possible.

[float]
==== `redact.fields`

This specifies fields in the `state` to be redacted prior to debug logging. Fields listed in this array will be either replaced with a `*` or deleted entirely from messages sent to debug logs.

[float]
==== `redact.delete`

This specifies whether fields should be replaced with a `*` or deleted entirely from messages sent to debug logs. If delete is `true`, fields will be deleted rather than replaced.

[float]
=== Metrics

This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>.
These metrics are exposed under the `/inputs` path. They can be used to
observe the activity of the input.

[options="header"]
|=======
| Metric | Description
| `url` | URL of the input resource.
| `cel_eval_errors` | Number of errors encountered during cel program evaluation.
| `errors_total` | Number of errors encountered over the life cycle of the input.
| `batches_received_total` | Number of event arrays received.
| `batches_published_total` | Number of event arrays published.
| `received_bytes_total` | Number of bytes received over the life cycle of the input.
| `events_received_total` | Number of events received.
| `events_published_total` | Number of events published.
| `cel_processing_time` | Histogram of the elapsed successful CEL program processing times in nanoseconds.
| `batch_processing_time` | Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches).
|=======

==== Developer tools

A stand-alone CEL environment that implements the majority of the websocket input's Comment Expression Language functionality is available in the https://github.com/elastic/mito[Elastic Mito] repository. This tool may be used to help develop CEL programs to be used by the input. Installation is available from source by running `go install github.com/elastic/mito/cmd/mito@latest` and requires a Go toolchain.

[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

NOTE: The `websocket` input is currently tagged as experimental and might have bugs and other issues. Please report any issues on the https://github.com/elastic/beats[Github] repository.

:type!:
Loading

0 comments on commit e5000b6

Please sign in to comment.