Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

websocket: new generic integration #9926

Merged
merged 10 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@
/packages/universal_profiling_symbolizer @elastic/obs-ds-intake-services
/packages/vectra_detect @elastic/security-service-integrations
/packages/vsphere @elastic/obs-infraobs-integrations
/packages/websocket @elastic/security-service-integrations
/packages/watchguard_firebox @elastic/sec-deployment-and-devices
/packages/websphere_application_server @elastic/obs-infraobs-integrations
/packages/windows @elastic/elastic-agent-data-plane @elastic/sec-windows-platform
Expand Down
4 changes: 4 additions & 0 deletions packages/websocket/_dev/build/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dependencies:
ecs:
reference: [email protected]
import_mappings: true
27 changes: 27 additions & 0 deletions packages/websocket/_dev/build/docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Custom WebSocket Input

The WebSocket input integration enables ingestion of real-time data from WebSocket servers. WebSockets provide a full-duplex communication channel over a single, long-lived connection, which makes it suitable for scenarios where low latency data transmission is required.

This input type connects to a WebSocket URL, listens for messages sent by the server, and processes these messages as they arrive. The data is then published to Elasticsearch, making it searchable and analyzable in near real-time.

## Configuration

The full documentation for configuring the WebSocket input can be found [here](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html).

To configure the WebSocket input, specify the connection URL and other optional parameters such as headers for authentication or protocol versions. Advanced options for connection handling, such as timeouts and subprotocols, can be configured in the "Advanced options" section.

### Example Configuration

Here is a basic example of how to configure the WebSocket input:

![Configuration Page](../img/websocket_configuration.png)

This configuration establishes a WebSocket connection to ws://localhost:443/v1/stream and uses basic authentication.

## Data Processing

The WebSocket input will consume messages from the server as they are transmitted. These messages are expected to be in a format that Filebeat can process, such as JSON. If the message format is different, you may need to define a processor to parse and structure the data before it is sent to Elasticsearch.

## Connection Management

The WebSocket input manages the connection to the WebSocket server, including automatically reconnecting if the connection is lost. The input does not maintain any state between restarts, so if the server sends historical data, it will be re-ingested upon reconnection.
15 changes: 15 additions & 0 deletions packages/websocket/_dev/deploy/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version: "2.3"
services:
websocket-mock-service:
image: golang:1.21-alpine
hostname: websocket
working_dir: /app
volumes:
- ./websocket-mock-service:/app
ports:
- "3000:3000"
healthcheck:
test: "wget --no-verbose --tries=1 --spider http://localhost:3000/health || exit 1"
interval: 10s
timeout: 5s
command: ["go", "run", "main.go"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module websocket-mock-service

go 1.21.3

require github.com/gorilla/websocket v1.5.1

require golang.org/x/net v0.17.0 // indirect
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please go fmt this code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reverse the order of function declarations. The convention in Go code is caller before callee.

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package main
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

import (
"log"
"net/http"

"github.com/gorilla/websocket"
)

func main() {
http.HandleFunc("/", handleWebSocket)
log.Fatal(http.ListenAndServe(":3000", nil))
}

func handleWebSocket(w http.ResponseWriter, r *http.Request) {

ShourieG marked this conversation as resolved.
Show resolved Hide resolved
if r.URL.Path == "/health" {
return
}

if r.URL.Path == "/testbasicauth" {
// Check if the 'Authorization' header is set for basic authentication
authHeader := r.Header.Get("Authorization")
if authHeader != "Basic dGVzdDp0ZXN0" {
// If the header is incorrect, return an authentication error message
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte("Error: Authentication failed."))
return
}
}

upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
conn, err := upgrader.Upgrade(w, r, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
conn, err := upgrader.Upgrade(w, r, nil)
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
conn, err := upgrader.Upgrade(w, r, nil)

if err != nil {
log.Println(err)
return
}
defer conn.Close()

var responseMessage []map[string]string

if r.URL.Path == "/testbasicauth" {
// Check if the 'Authorization' header is set for basic authentication
authHeader := r.Header.Get("Authorization")
if authHeader == "Basic dGVzdDp0ZXN0" {
// If the header is correct, return a success message
responseMessage = []map[string]string{
{
"message": "You are now authenticated to the WebSocket server.",
},
}
}
} else if r.URL.Path == "/test" {
// Return a success message
responseMessage = []map[string]string{
{
"ts": "2024-01-01T01:00:00.000000-00:00",
"data": "testdata1",
"id": "test1234567891",
},
{
"ts": "2024-01-01T02:00:00.000000-00:00",
"data": "testdata2",
"id": "test1234567890",
},
}
}

// Send a message to the client upon successful WebSocket connection
err = conn.WriteJSON(responseMessage)
if err != nil {
log.Println("write:", err)
return
}
}
9 changes: 9 additions & 0 deletions packages/websocket/_dev/test/system/test-auth-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
input: websocket
service: websocket-mock-service
vars:
url: ws://{{Hostname}}:{{Port}}/testbasicauth
basic_token: "dGVzdDp0ZXN0"
program: |
bytes(state.response).decode_json().as(body,{
"events": body,
})
13 changes: 13 additions & 0 deletions packages/websocket/_dev/test/system/test-get-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
input: websocket
service: websocket-mock-service
vars:
url: ws://{{Hostname}}:{{Port}}/test
program: |
bytes(state.response).decode_json().as(body,{
"events": body.map(e, {
"message": e.encode_json(),
}),
"cursor": {
"max_ts": body.map(e, e.ts).max()
}
})
60 changes: 60 additions & 0 deletions packages/websocket/agent/input/websocket.yml.hbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
data_stream:
dataset: {{data_stream.dataset}}
{{#if ssl}}
resource.ssl: {{ssl}}
{{/if}}
url: {{url}}

program: {{escape_string program}}

{{#if state}}
state:
{{state}}
{{/if}}
redact.delete: {{delete_redacted_fields}}
{{#if redact_fields}}
redact.fields:
{{#each redact_fields as |field|}}
- {{field}}
{{/each}}
{{/if}}

{{#if regexp}}
regexp:
{{regexp}}
{{/if}}

{{#if basic_token}}
auth.basic_token: {{basic_token}}
{{else if bearer_token}}
auth.bearer_token: {{bearer_token}}
{{/if}}

{{#unless basic_token}}
{{#unless bearer_token}}
{{#if header_key}}
auth.custom.header: {{header_key}}
{{/if}}
{{#if header_value}}
auth.custom.value: {{header_value}}
{{/if}}
{{/unless}}
{{/unless}}

{{#if pipeline}}
pipeline: {{pipeline}}
{{/if}}

{{#if tags}}
tags:
{{#each tags as |tag|}}
- {{tag}}
{{/each}}
{{/if}}
{{#contains "forwarded" tags}}
publisher_pipeline.disable_host: true
{{/contains}}
{{#if processors}}
processors:
{{processors}}
{{/if}}
6 changes: 6 additions & 0 deletions packages/websocket/changelog.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# newer versions go on top
- version: "0.1.0"
changes:
- description: Initial Implementation.
type: enhancement
link: https://github.com/elastic/integrations/pull/9926
27 changes: 27 additions & 0 deletions packages/websocket/docs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Custom WebSocket Input

The WebSocket input integration enables ingestion of real-time data from WebSocket servers. WebSockets provide a full-duplex communication channel over a single, long-lived connection, which makes it suitable for scenarios where low latency data transmission is required.

This input type connects to a WebSocket URL, listens for messages sent by the server, and processes these messages as they arrive. The data is then published to Elasticsearch, making it searchable and analyzable in near real-time.

## Configuration

The full documentation for configuring the WebSocket input can be found [here](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html).

To configure the WebSocket input, specify the connection URL and other optional parameters such as headers for authentication or protocol versions. Advanced options for connection handling, such as timeouts and subprotocols, can be configured in the "Advanced options" section.

### Example Configuration

Here is a basic example of how to configure the WebSocket input:

![Configuration Page](../img/websocket_configuration.png)

This configuration establishes a WebSocket connection to ws://localhost:443/v1/stream and uses basic authentication.

## Data Processing

The WebSocket input will consume messages from the server as they are transmitted. These messages are expected to be in a format that Filebeat can process, such as JSON. If the message format is different, you may need to define a processor to parse and structure the data before it is sent to Elasticsearch.

## Connection Management

The WebSocket input manages the connection to the WebSocket server, including automatically reconnecting if the connection is lost. The input does not maintain any state between restarts, so if the server sends historical data, it will be re-ingested upon reconnection.
16 changes: 16 additions & 0 deletions packages/websocket/fields/base-fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
- name: data_stream.type
type: constant_keyword
description: Data stream type.
- name: data_stream.dataset
type: constant_keyword
description: Data stream dataset.
- name: data_stream.namespace
type: constant_keyword
description: Data stream namespace.
- name: event.module
type: constant_keyword
description: Event module.
value: websocket
- name: '@timestamp'
type: date
description: Event timestamp.
9 changes: 9 additions & 0 deletions packages/websocket/fields/beats.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
- name: input.type
type: keyword
description: Type of filebeat input.
- name: log.offset
type: long
description: Log offset.
- name: tags
type: keyword
description: User defined tags.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading