Skip to content

Commit

Permalink
Add events generator tool for wazuh-alerts (#152)
Browse files Browse the repository at this point in the history
* Add events generator tool for wazuh-alerts

* Fix typo in README.md

Signed-off-by: Álex Ruiz <[email protected]>

* Make timestamps timezone aware

---------

Signed-off-by: Álex Ruiz <[email protected]>
Co-authored-by: Fede Tux <[email protected]>
  • Loading branch information
AlexRuiz7 and f-galland authored Feb 15, 2024
1 parent 620d8b6 commit 4c58d98
Show file tree
Hide file tree
Showing 5 changed files with 1,257 additions and 0 deletions.
1 change: 1 addition & 0 deletions integrations/tools/events-generator/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.venv
43 changes: 43 additions & 0 deletions integrations/tools/events-generator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
### Events generator tool

This python tool provides functionality to generate and index sample events for Wazuh's indices.

#### Getting started

Create a virtual environment to install the dependencies of the project.

```console
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```

Start the events' generator with `./run.py` or `python run.py`. The program takes no required
arguments, as it's configured with default values that will work in most cases during development.
To know more about its capabilities and arguments, display the help menu with `-h`.

As for now, this tool generates events for the `wazuh-alerts-4.x-*` and `wazuh-archives-4.x-*` indices.
Since 4.8.0, these indices are aliased to `wazuh-alerts` and `wazuh-archives`. If you need to, run the
[indexer-ism-init.sh](../../../distribution/src/bin/indexer-ism-init.sh) script to create them. This is important as, by default, the tool will write to
the `wazuh-alerts` alias. You may also need to create an **index pattern** in _dashboards_ in order to perform
queries to the index from the UI. To do that, go to Dashboards Management > Index Patterns > Create index pattern > wazuh-alerts-4.x-* > timestamp as Time field

Newer indices, like `wazuh-states-vulnerabilities`, are ECS compliant and use a dedicated events' generator.
You can find it in the [ecs](../../../ecs/) folder.


```console
python run.py -o indexer -c 5 -t 1
INFO:event_generator:Inventory created
INFO:event_generator:Publisher created
INFO:event_generator:Event created
{'_index': 'wazuh-alerts-4.x-2024.02.13-000001', '_id': 'dRWno40BZRXLJU5t0u6Z', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 168, '_primary_term': 1}
INFO:event_generator:Event created
{'_index': 'wazuh-alerts-4.x-2024.02.13-000001', '_id': 'dhWno40BZRXLJU5t1u6Y', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 169, '_primary_term': 1}
INFO:event_generator:Event created
{'_index': 'wazuh-alerts-4.x-2024.02.13-000001', '_id': 'dxWno40BZRXLJU5t2u6i', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 170, '_primary_term': 1}
INFO:event_generator:Event created
{'_index': 'wazuh-alerts-4.x-2024.02.13-000001', '_id': 'eBWno40BZRXLJU5t3u6v', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 171, '_primary_term': 1}
INFO:event_generator:Event created
{'_index': 'wazuh-alerts-4.x-2024.02.13-000001', '_id': 'eRWno40BZRXLJU5t4u66', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 172, '_primary_term': 1}
```
1 change: 1 addition & 0 deletions integrations/tools/events-generator/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
requests>=2.31.0
212 changes: 212 additions & 0 deletions integrations/tools/events-generator/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
#!/usr/bin/pyton

# Events generator tool for Wazuh's indices.
# Chooses a random element from <index>/alerts.json to index
# (indexer, filebeat). Required. Destination of the events. Default: indexer.
# -c: Number of elements to push. Use 0 to run indefinitely. Default: 0
# -i: index name prefix or module (e.g: wazuh-alerts, wazuh-states-vulnerabilities)
# -t: interval between events in seconds. Default: 5
# when output is "indexer", the following parameters can be provided:
# -a: indexer's API IP address or hostname.
# -P: indexer's API port number.
# -u: username
# -p: password


from abc import ABC, abstractmethod
import argparse
import datetime
import logging
import random
import requests
import time
import json
import urllib3
# import OpenSearch.opensearchpy

logging.basicConfig(level=logging.NOTSET)
# Combination to supress certificates validation warning when verify=False
# https://github.com/influxdata/influxdb-python/issues/240#issuecomment-341313420
logging.getLogger("urllib3").setLevel(logging.ERROR)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

logger = logging.getLogger("event_generator")

# ================================================== #


class Inventory:
def __init__(self, path: str):
with open(path, "r") as fd:
self.elements = fd.readlines()
self.size = len(self.elements)

def get_random(self) -> str:
random.shuffle(self.elements)
return self.elements.pop()
# return self.elements[random.randint(0, self.size)]

# ================================================== #


class Publisher(ABC):
@abstractmethod
def publish(self, event: str):
pass

# ================================================== #


class PublisherClient(Publisher):
def __init__(self):
# self.client = OpenSearch(
# hosts...
# )
pass

# ================================================== #


class PublisherHttp(Publisher):
def __init__(self, address: str, port: int, path: str, user: str, password: str):
super()
self.address = address
self.port = port
self.path = path
self.username = user
self.password = password

def url(self) -> str:
return f"https://{self.address}:{self.port}/{self.path}/_doc"

def publish(self, event: str):
try:
result = requests.post(
self.url(),
auth=(self.username, self.password),
json=json.loads(event),
verify=False
)
print(result.json())
except json.JSONDecodeError as e:
logger.error("Error encoding event " +
event + "\n Caused by: " + e.msg)

# ================================================== #


class PublisherFilebeat(Publisher):
def __init__(self):
super()
self.path = "/var/ossec/logs/alerts/alerts.json"

def publish(self, event: str):
with open(self.path, "a") as fd:
fd.write(event)

# ================================================== #


class PublisherCreator:
@staticmethod
def create(publisher: str, args) -> Publisher:
if publisher == "indexer":
address = args["address"]
port = args["port"]
path = args["index"]
username = args["username"]
password = args["password"]

return PublisherHttp(address, port, path, username, password)
elif publisher == "filebeat":
return PublisherFilebeat()
else:
raise ValueError("Unsupported publisher type")

# ================================================== #


def date_now() -> str:
return datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]+'+0000'

# ================================================== #


def parse_args():
parser = argparse.ArgumentParser(
description="Events generator tool for Wazuh's indices. Indexes a random element from <index>/alerts.json",
)
parser.add_argument(
'-o', '--output',
choices=['indexer', 'filebeat'],
default="indexer",
help="Destination of the events. Default: indexer."
)
parser.add_argument(
'-i', '--index',
default="wazuh-alerts",
help="Index name or module (e.g: wazuh-alerts, wazuh-states-vulnerabilities)"
)
# Infinite loop by default
parser.add_argument(
'-c', '--count',
default=0,
type=int,
help="Number of elements to push. Use 0 to run indefinitely. Default: 0"
)
# Interval of time between events
parser.add_argument(
'-t', '--time',
default=5,
type=int,
help="Interval between events in seconds. Default: 5"
)
parser.add_argument(
'-a', '--address',
default="localhost",
help="Indexer's API IP address or hostname."
)
parser.add_argument(
'-P', '--port',
default=9200,
type=int,
help="Indexer's API port number."
)
parser.add_argument(
'-u', '--username',
default="admin",
help="Indexer's username"
)
parser.add_argument(
'-p', '--password',
default="admin",
help="Indexer's password"
)
return parser.parse_args()


# ================================================== #


def main(args: dict):
inventory = Inventory(f"{args['index']}/alerts.json")
logger.info("Inventory created")
publisher = PublisherCreator.create(args["output"], args)
logger.info("Publisher created")

count = 0
max_iter = args["count"]
time_interval = args["time"]
while (count < max_iter or max_iter == 0):
chosen = inventory.get_random().replace("{timestamp}", date_now())
logger.info("Event created")
publisher.publish(chosen)

time.sleep(time_interval)
count += 1

# ================================================== #


if __name__ == '__main__':
main(vars(parse_args()))
Loading

0 comments on commit 4c58d98

Please sign in to comment.