Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add support for running Censys queries via analytics #988

Merged
merged 10 commits into from
Jan 25, 2024
Merged
1 change: 1 addition & 0 deletions core/schemas/indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class QueryType(str, Enum):
osquery = "osquery"
sql = "sql"
splunk = "splunk"
censys = "censys"


class Query(Indicator):
Expand Down
58 changes: 58 additions & 0 deletions plugins/analytics/public/censys.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please give it a ruff formatter pass (We'll add a github action check soon)

Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
from typing import Optional
from datetime import timedelta

from core.schemas import task
from core import taskmanager
from core.config.config import yeti_config
from core.schemas.observable import Observable
from core.schemas import indicator
from censys.search import CensysHosts


class CensysApiQuery(task.AnalyticsTask):
_defaults = {
"name": "Censys",
"description": "Executes Censys queries (stored as indicators) and tags the returned IP addresses.",
"frequency": timedelta(hours=24),
}

def run(self):
api_key = yeti_config.get("censys", "api_key")
api_secret = yeti_config.get("censys", "secret")

if not (api_key and api_secret):
logging.error(
"Error: please configure an api_key and secret to use Censys analytics"
)
raise RuntimeError

hosts_api = CensysHosts(
api_id=api_key,
api_secret=api_secret,
)

censys_queries, _ = indicator.Query.filter(
{"query_type": indicator.QueryType.censys}
)

for query in censys_queries:
ip_addresses = self.query_censys(hosts_api, query.pattern)
for ip in ip_addresses:
ip_object = Observable.add_text(ip)
ip_object.tag(query.relevant_tags)
query.link_to(
ip_object, "censys", f"IP found with Censys query: {query.pattern}"
)

def query_censys(self, api: CensysHosts, query: str) -> set[Optional[str]]:
"""Queries Censys and returns all identified IP addresses."""
ip_addresses: set[Optional[str]] = set()
results = api.search(query, fields=["ip"], pages=-1)
for result in results:
ip_addresses.update(record.get("ip") for record in result)

return ip_addresses


taskmanager.TaskManager.register_task(CensysApiQuery)
113 changes: 112 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ timesketch-api-client = "^20230721"
pyopenssl = "^23.3.0"
ipwhois = "^1.2.0"
maclookup = "^1.0.3"
censys = "^2.2.10"

[build-system]
requires = ["poetry-core"]
Expand Down
75 changes: 75 additions & 0 deletions tests/analytics_test.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as the other file, please give this a ruff formatter pass

Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import os
import unittest
from unittest.mock import patch, MagicMock
from plugins.analytics.public import censys
from core import database_arango
from censys.search import CensysHosts
from core.schemas import indicator
from core.config.config import yeti_config
from core.schemas.indicator import DiamondModel
from core.schemas.observable import ObservableType
from core.schemas import observable


class AnalyticsTest(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
database_arango.db.connect(database="yeti_test")
database_arango.db.clear()

@patch("plugins.analytics.public.censys.CensysHosts")
def test_censys_query(self, mock_censys_hosts):
mock_hosts_api = MagicMock(spec=CensysHosts)
mock_censys_hosts.return_value = mock_hosts_api

os.environ["YETI_CENSYS_API_KEY"] = "test_api_key"
os.environ["YETI_CENSYS_SECRET"] = "test_secret"

censys_query = indicator.Query(
name="Censys test query name",
description="Censys test query description",
pattern="test_censys_query",
location="censys",
diamond=DiamondModel.infrastructure,
relevant_tags=["censys_query_tag"],
query_type=indicator.QueryType.censys,
).save()

mock_search_result = [
{"ip": "192.0.2.1"},
{"ip": "2001:db8:3333:4444:5555:6666:7777:8888"},
]
mock_hosts_api.search.return_value = [mock_search_result]

defaults = censys.CensysApiQuery._defaults.copy()
analytics = censys.CensysApiQuery(**defaults)

analytics.run()

mock_censys_hosts.assert_called_once()
mock_hosts_api.search.assert_called_once_with(
"test_censys_query", fields=["ip"], pages=-1
)

observables = observable.Observable.filter(
{"value": ""}, graph_queries=[("tags", "tagged", "outbound", "name")]
)
observable_obj, _ = observables

self.assertEqual(observable_obj[0].value, "192.0.2.1")
self.assertEqual(observable_obj[0].type, ObservableType.ipv4)
self.assertEqual(set(observable_obj[0].tags.keys()), {"censys_query_tag"})

self.assertEqual(
observable_obj[1].value, "2001:db8:3333:4444:5555:6666:7777:8888"
)
self.assertEqual(observable_obj[1].type, ObservableType.ipv6)
self.assertEqual(set(observable_obj[1].tags.keys()), {"censys_query_tag"})

query_neighbors = [o.value for o in censys_query.neighbors()[0].values()]
self.assertIn("192.0.2.1", query_neighbors)
self.assertIn("2001:db8:3333:4444:5555:6666:7777:8888", query_neighbors)


if __name__ == "__main__":
unittest.main()