Skip to content

Commit

Permalink
fix: refactor enrich
Browse files Browse the repository at this point in the history
  • Loading branch information
ajasnosz committed Oct 15, 2024
1 parent 948e831 commit d10ee2b
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 81 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/ci-main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ jobs:
steps:
- name: Checkout Project
uses: actions/checkout@v4
- name: Setup python
uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: run install_microk8s.sh
run: |
sudo snap install microk8s --classic --channel=1.30/stable
Expand All @@ -162,6 +166,10 @@ jobs:
steps:
- name: Checkout Project
uses: actions/checkout@v4
- name: Setup python
uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: Install docker compose
run: |
# Add Docker's official GPG key:
Expand Down
16 changes: 3 additions & 13 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ testpaths = ["test"]
python_files = ["test_*.py"]

[tool.poetry.dependencies]
python = "^3.8"
python = ">=3.8,<3.12"
pymongo = {extras = ["srv"], version = "^4.0.0"}
requests = {extras = ["crypto"], version = "^2.31.0"}
celery = {extras = ["tblib"], version = "5.4.0"}
Expand Down
166 changes: 107 additions & 59 deletions splunk_connect_for_snmp/enrich/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,8 @@ def enrich(self, result):
updates = []
attribute_updates = []

current_target = targets_collection.find_one(
{"address": address}, {"target": True, "sysUpTime": True}
)
if current_target is None:
logger.info(f"First time for {address}")
current_target = {"address": address}
else:
logger.info(f"Not first time for {address}")
current_target = get_current_target(address, targets_collection)

# TODO: Compare the ts field with the lastmodified time of record and only update if we are newer
check_restart(current_target, result["result"], targets_collection, address)
logger.info(f"After check_restart for {address}")
# First write back to DB new/changed data
Expand Down Expand Up @@ -136,46 +128,17 @@ def enrich(self, result):
upsert=True,
)
new_fields = []
for field_key, field_value in group_data["fields"].items():
field_key_hash = field_key.replace(".", "|")
field_value["name"] = field_key
cv = None
if current_attributes and field_key_hash in current_attributes.get(
"fields", {}
):
cv = current_attributes["fields"][field_key_hash]

# if new field_value is different than the previous one, update
if cv and cv != field_value:
# modifed
attribute_updates.append(
{"$set": {f"fields.{field_key_hash}": field_value}}
)

elif cv:
# unchanged
pass
else:
# new
new_fields.append({"$set": {f"fields.{field_key_hash}": field_value}})
if field_key in TRACKED_F:
updates.append(
{"$set": {f"state.{field_key.replace('.', '|')}": field_value}}
)

if len(updates) >= MONGO_UPDATE_BATCH_THRESHOLD:
targets_collection.update_one(
{"address": address}, updates, upsert=True
)
updates.clear()

if len(attribute_updates) >= MONGO_UPDATE_BATCH_THRESHOLD:
attributes_collection.update_one(
{"address": address, "group_key_hash": group_key_hash},
attribute_updates,
upsert=True,
)
attribute_updates.clear()
set_attribute_updates(
address,
attribute_updates,
attributes_collection,
current_attributes,
group_data,
group_key_hash,
new_fields,
targets_collection,
updates,
)
if new_fields:
attributes_bulk_write_operations.append(
UpdateOne(
Expand All @@ -186,15 +149,14 @@ def enrich(self, result):
)
new_fields.clear()

if updates:
targets_collection.update_one({"address": address}, updates, upsert=True)
updates.clear()
if attribute_updates:
attributes_collection.update_one(
{"address": address, "group_key_hash": group_key_hash},
attribute_updates,
)
attribute_updates.clear()
update_collections(
address,
attribute_updates,
attributes_collection,
group_key_hash,
targets_collection,
updates,
)

# Now add back any fields we need
if current_attributes:
Expand All @@ -203,6 +165,23 @@ def enrich(self, result):
if attribute_group_id in result["result"]:
snmp_object = result["result"][attribute_group_id]
enrich_metric_with_fields_from_db(snmp_object, fields)
bulk_write_attributes(attributes_bulk_write_operations, attributes_collection)
return result


def get_current_target(address, targets_collection):
current_target = targets_collection.find_one(
{"address": address}, {"target": True, "sysUpTime": True}
)
if current_target is None:
logger.info(f"First time for {address}")
current_target = {"address": address}
else:
logger.info(f"Not first time for {address}")
return current_target


def bulk_write_attributes(attributes_bulk_write_operations, attributes_collection):
if attributes_bulk_write_operations:
logger.debug("Start of bulk_write")
start = time.time()
Expand All @@ -214,7 +193,76 @@ def enrich(self, result):
f"ELAPSED TIME OF BULK: {end - start} for {len(attributes_bulk_write_operations)} operations"
)
logger.debug(f"result api: {bulk_result.bulk_api_result}")
return result


def update_collections(
address,
attribute_updates,
attributes_collection,
group_key_hash,
targets_collection,
updates,
):
if updates:
targets_collection.update_one({"address": address}, updates, upsert=True)
updates.clear()
if attribute_updates:
attributes_collection.update_one(
{"address": address, "group_key_hash": group_key_hash},
attribute_updates,
)
attribute_updates.clear()


def set_attribute_updates(
address,
attribute_updates,
attributes_collection,
current_attributes,
group_data,
group_key_hash,
new_fields,
targets_collection,
updates,
):
for field_key, field_value in group_data["fields"].items():
field_key_hash = field_key.replace(".", "|")
field_value["name"] = field_key
cv = None
if current_attributes and field_key_hash in current_attributes.get(
"fields", {}
):
cv = current_attributes["fields"][field_key_hash]

# if new field_value is different than the previous one, update
if cv and cv != field_value:
# modifed
attribute_updates.append(
{"$set": {f"fields.{field_key_hash}": field_value}}
)

elif cv:
# unchanged
pass
else:
# new
new_fields.append({"$set": {f"fields.{field_key_hash}": field_value}})
if field_key in TRACKED_F:
updates.append(
{"$set": {f"state.{field_key.replace('.', '|')}": field_value}}
)

if len(updates) >= MONGO_UPDATE_BATCH_THRESHOLD:
targets_collection.update_one({"address": address}, updates, upsert=True)
updates.clear()

if len(attribute_updates) >= MONGO_UPDATE_BATCH_THRESHOLD:
attributes_collection.update_one(
{"address": address, "group_key_hash": group_key_hash},
attribute_updates,
upsert=True,
)
attribute_updates.clear()


def enrich_metric_with_fields_from_db(snmp_object, fields_from_db):
Expand Down
52 changes: 44 additions & 8 deletions test/enrich/test_enrich.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from unittest import TestCase
from unittest.mock import patch
from unittest.mock import MagicMock, patch

from splunk_connect_for_snmp.enrich.tasks import (
enrich,
enrich_metric_with_fields_from_db,
get_current_target,
logger,
)

attributes = {
"id": "GROUP1",
"address": "192.168.0.1",
"address": "192.168.0.1:161",
"fields": {
"SNMPv2-MIB|sysDescr": {
"time": 1234,
Expand Down Expand Up @@ -36,7 +38,7 @@

attributes2 = {
"id": "GROUP2",
"address": "192.168.0.1",
"address": "192.168.0.1:161",
"fields": {
"UDP-MIB|extraAttr": {
"time": 1234,
Expand All @@ -49,7 +51,7 @@
}

input_dict = {
"address": "192.168.0.1",
"address": "192.168.0.1:161",
"result": {
"GROUP1": {
"fields": {
Expand Down Expand Up @@ -92,7 +94,7 @@

input_enrich = {
"time": 1676291976.2939305,
"address": "54.91.99.113",
"address": "54.91.99.113:161",
"result": {
"ENTITY-MIB::int=1": {
"metrics": {},
Expand Down Expand Up @@ -232,7 +234,7 @@ class TestEnrich(TestCase):
@patch("pymongo.collection.Collection.bulk_write")
@patch("splunk_connect_for_snmp.enrich.tasks.check_restart")
def test_enrich(self, m_check_restart, bulk_write, m_update_one, m_find_one):
current_target = {"address": "192.168.0.1"}
current_target = {"address": "192.168.0.1:161"}
m_find_one.side_effect = [current_target, True, attributes, attributes2, {}]

result = enrich(input_dict)
Expand Down Expand Up @@ -281,7 +283,7 @@ def test_enrich(self, m_check_restart, bulk_write, m_update_one, m_find_one):
result["result"]["GROUP2"]["fields"]["UDP-MIB.extraAttr"],
)

self.assertEqual("192.168.0.1", result["address"])
self.assertEqual("192.168.0.1:161", result["address"])

m_check_restart.assert_called()
bulk_write.assert_called()
Expand Down Expand Up @@ -323,7 +325,7 @@ def test_enrich_no_target(
)

bulk_write.assert_called()
self.assertEqual("192.168.0.1", result["address"])
self.assertEqual("192.168.0.1:161", result["address"])

def test_enrich_metric_with_fields_from_db(self):
additional_field = {
Expand Down Expand Up @@ -480,3 +482,37 @@ def test_enrich_metric_with_fields_no_metrics(self):
result = snmp_object.copy()
enrich_metric_with_fields_from_db(snmp_object, additional_field)
self.assertEqual(result, snmp_object)

def test_get_current_target(self):
address = "127.0.0.1:161"
targets_collection = MagicMock()
targets_collection.find_one.side_effect = [
address,
True,
attributes,
attributes2,
{},
]
with self.assertLogs(logger, level="INFO") as logs:
current_address = get_current_target(address, targets_collection)
self.assertEqual("127.0.0.1:161", current_address)
self.assertEqual(
[
"INFO:splunk_connect_for_snmp.enrich.tasks:Not first time for 127.0.0.1:161"
],
logs.output,
)

def test_get_current_target_empty_find(self):
address = "127.0.0.1:161"
targets_collection = MagicMock()
targets_collection.find_one.return_value = None
with self.assertLogs(logger, level="INFO") as logs:
current_address = get_current_target(address, targets_collection)
self.assertEqual({"address": "127.0.0.1:161"}, current_address)
self.assertEqual(
[
"INFO:splunk_connect_for_snmp.enrich.tasks:First time for 127.0.0.1:161"
],
logs.output,
)

0 comments on commit d10ee2b

Please sign in to comment.