Skip to content

Commit

Permalink
websocket: new generic integration (#9926)
Browse files Browse the repository at this point in the history
* New Generic Integration
1. This makes the Filebeat Websocket input available as an integration package.

* Updated changelog entry.

* Resolved comments
1. Changed the title as suggested in Readme and manifest.
2. Added support for state, regexp, redact and auth support in the hbs file.
3. Added another scenario for system test.

* Resolved comments
1. Remove default value of URL from manifest.
2. Reverse the order of function declarations.
3. Added copyright header.
4. Removed note from readme and update screenshot.

* Fixed the system test as suggested
* added health check to mock-service container

---------

Co-authored-by: Shourie Ganguly <[email protected]>
  • Loading branch information
muskan-agarwal26 and ShourieG authored Jul 17, 2024
1 parent 74a1d90 commit 9b26696
Show file tree
Hide file tree
Showing 18 changed files with 465 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@
/packages/universal_profiling_symbolizer @elastic/obs-ds-intake-services
/packages/vectra_detect @elastic/security-service-integrations
/packages/vsphere @elastic/obs-infraobs-integrations
/packages/websocket @elastic/security-service-integrations
/packages/watchguard_firebox @elastic/sec-deployment-and-devices
/packages/websphere_application_server @elastic/obs-infraobs-integrations
/packages/windows @elastic/elastic-agent-data-plane @elastic/sec-windows-platform
Expand Down
4 changes: 4 additions & 0 deletions packages/websocket/_dev/build/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dependencies:
ecs:
reference: [email protected]
import_mappings: true
27 changes: 27 additions & 0 deletions packages/websocket/_dev/build/docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Custom WebSocket Input

The WebSocket input integration enables ingestion of real-time data from WebSocket servers. WebSockets provide a full-duplex communication channel over a single, long-lived connection, which makes it suitable for scenarios where low latency data transmission is required.

This input type connects to a WebSocket URL, listens for messages sent by the server, and processes these messages as they arrive. The data is then published to Elasticsearch, making it searchable and analyzable in near real-time.

## Configuration

The full documentation for configuring the WebSocket input can be found [here](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html).

To configure the WebSocket input, specify the connection URL and other optional parameters such as headers for authentication or protocol versions. Advanced options for connection handling, such as timeouts and subprotocols, can be configured in the "Advanced options" section.

### Example Configuration

Here is a basic example of how to configure the WebSocket input:

![Configuration Page](../img/websocket_configuration.png)

This configuration establishes a WebSocket connection to ws://localhost:443/v1/stream and uses basic authentication.

## Data Processing

The WebSocket input will consume messages from the server as they are transmitted. These messages are expected to be in a format that Filebeat can process, such as JSON. If the message format is different, you may need to define a processor to parse and structure the data before it is sent to Elasticsearch.

## Connection Management

The WebSocket input manages the connection to the WebSocket server, including automatically reconnecting if the connection is lost. The input does not maintain any state between restarts, so if the server sends historical data, it will be re-ingested upon reconnection.
15 changes: 15 additions & 0 deletions packages/websocket/_dev/deploy/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: "2.3"
services:
websocket-mock-service:
image: golang:1.21-alpine
hostname: websocket
working_dir: /app
volumes:
- ./websocket-mock-service:/app
ports:
- "3000:3000"
healthcheck:
test: "wget --no-verbose --tries=1 --spider http://localhost:3000/health || exit 1"
interval: 10s
timeout: 5s
command: ["go", "run", "main.go"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module websocket-mock-service

go 1.21.3

require github.com/gorilla/websocket v1.5.1

require golang.org/x/net v0.17.0 // indirect
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package main

import (
"log"
"net/http"

"github.com/gorilla/websocket"
)

func main() {
http.HandleFunc("/", handleWebSocket)
log.Fatal(http.ListenAndServe(":3000", nil))
}

func handleWebSocket(w http.ResponseWriter, r *http.Request) {

if r.URL.Path == "/health" {
return
}

if r.URL.Path == "/testbasicauth" {
// Check if the 'Authorization' header is set for basic authentication
authHeader := r.Header.Get("Authorization")
if authHeader != "Basic dGVzdDp0ZXN0" {
// If the header is incorrect, return an authentication error message
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte("Error: Authentication failed."))
return
}
}

upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
defer conn.Close()

var responseMessage []map[string]string

if r.URL.Path == "/testbasicauth" {
// Check if the 'Authorization' header is set for basic authentication
authHeader := r.Header.Get("Authorization")
if authHeader == "Basic dGVzdDp0ZXN0" {
// If the header is correct, return a success message
responseMessage = []map[string]string{
{
"message": "You are now authenticated to the WebSocket server.",
},
}
}
} else if r.URL.Path == "/test" {
// Return a success message
responseMessage = []map[string]string{
{
"ts": "2024-01-01T01:00:00.000000-00:00",
"data": "testdata1",
"id": "test1234567891",
},
{
"ts": "2024-01-01T02:00:00.000000-00:00",
"data": "testdata2",
"id": "test1234567890",
},
}
}

// Send a message to the client upon successful WebSocket connection
err = conn.WriteJSON(responseMessage)
if err != nil {
log.Println("write:", err)
return
}
}
9 changes: 9 additions & 0 deletions packages/websocket/_dev/test/system/test-auth-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
input: websocket
service: websocket-mock-service
vars:
url: ws://{{Hostname}}:{{Port}}/testbasicauth
basic_token: "dGVzdDp0ZXN0"
program: |
bytes(state.response).decode_json().as(body,{
"events": body,
})
13 changes: 13 additions & 0 deletions packages/websocket/_dev/test/system/test-get-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
input: websocket
service: websocket-mock-service
vars:
url: ws://{{Hostname}}:{{Port}}/test
program: |
bytes(state.response).decode_json().as(body,{
"events": body.map(e, {
"message": e.encode_json(),
}),
"cursor": {
"max_ts": body.map(e, e.ts).max()
}
})
60 changes: 60 additions & 0 deletions packages/websocket/agent/input/websocket.yml.hbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
data_stream:
dataset: {{data_stream.dataset}}
{{#if ssl}}
resource.ssl: {{ssl}}
{{/if}}
url: {{url}}

program: {{escape_string program}}

{{#if state}}
state:
{{state}}
{{/if}}
redact.delete: {{delete_redacted_fields}}
{{#if redact_fields}}
redact.fields:
{{#each redact_fields as |field|}}
- {{field}}
{{/each}}
{{/if}}

{{#if regexp}}
regexp:
{{regexp}}
{{/if}}

{{#if basic_token}}
auth.basic_token: {{basic_token}}
{{else if bearer_token}}
auth.bearer_token: {{bearer_token}}
{{/if}}

{{#unless basic_token}}
{{#unless bearer_token}}
{{#if header_key}}
auth.custom.header: {{header_key}}
{{/if}}
{{#if header_value}}
auth.custom.value: {{header_value}}
{{/if}}
{{/unless}}
{{/unless}}

{{#if pipeline}}
pipeline: {{pipeline}}
{{/if}}

{{#if tags}}
tags:
{{#each tags as |tag|}}
- {{tag}}
{{/each}}
{{/if}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
{{#if processors}}
processors:
{{processors}}
{{/if}}
6 changes: 6 additions & 0 deletions packages/websocket/changelog.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# newer versions go on top
- version: "0.1.0"
changes:
- description: Initial Implementation.
type: enhancement
link: https://github.com/elastic/integrations/pull/9926
27 changes: 27 additions & 0 deletions packages/websocket/docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Custom WebSocket Input

The WebSocket input integration enables ingestion of real-time data from WebSocket servers. WebSockets provide a full-duplex communication channel over a single, long-lived connection, which makes it suitable for scenarios where low latency data transmission is required.

This input type connects to a WebSocket URL, listens for messages sent by the server, and processes these messages as they arrive. The data is then published to Elasticsearch, making it searchable and analyzable in near real-time.

## Configuration

The full documentation for configuring the WebSocket input can be found [here](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html).

To configure the WebSocket input, specify the connection URL and other optional parameters such as headers for authentication or protocol versions. Advanced options for connection handling, such as timeouts and subprotocols, can be configured in the "Advanced options" section.

### Example Configuration

Here is a basic example of how to configure the WebSocket input:

![Configuration Page](../img/websocket_configuration.png)

This configuration establishes a WebSocket connection to ws://localhost:443/v1/stream and uses basic authentication.

## Data Processing

The WebSocket input will consume messages from the server as they are transmitted. These messages are expected to be in a format that Filebeat can process, such as JSON. If the message format is different, you may need to define a processor to parse and structure the data before it is sent to Elasticsearch.

## Connection Management

The WebSocket input manages the connection to the WebSocket server, including automatically reconnecting if the connection is lost. The input does not maintain any state between restarts, so if the server sends historical data, it will be re-ingested upon reconnection.
16 changes: 16 additions & 0 deletions packages/websocket/fields/base-fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
- name: data_stream.type
type: constant_keyword
description: Data stream type.
- name: data_stream.dataset
type: constant_keyword
description: Data stream dataset.
- name: data_stream.namespace
type: constant_keyword
description: Data stream namespace.
- name: event.module
type: constant_keyword
description: Event module.
value: websocket
- name: '@timestamp'
type: date
description: Event timestamp.
9 changes: 9 additions & 0 deletions packages/websocket/fields/beats.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
- name: input.type
type: keyword
description: Type of filebeat input.
- name: log.offset
type: long
description: Log offset.
- name: tags
type: keyword
description: User defined tags.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 9b26696

Please sign in to comment.