Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refactor enrich #1100

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/ci-main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ jobs:
steps:
- name: Checkout Project
uses: actions/checkout@v4
- name: Setup python
uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: run install_microk8s.sh
run: |
sudo snap install microk8s --classic --channel=1.30/stable
Expand All @@ -162,6 +166,10 @@ jobs:
steps:
- name: Checkout Project
uses: actions/checkout@v4
- name: Setup python
uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: Install docker compose
run: |
# Add Docker's official GPG key:
Expand Down
16 changes: 3 additions & 13 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ testpaths = ["test"]
python_files = ["test_*.py"]

[tool.poetry.dependencies]
python = "^3.8"
python = ">=3.8,<3.12"
pymongo = {extras = ["srv"], version = "^4.0.0"}
requests = {extras = ["crypto"], version = "^2.31.0"}
celery = {extras = ["tblib"], version = "5.4.0"}
Expand Down
166 changes: 107 additions & 59 deletions splunk_connect_for_snmp/enrich/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,8 @@ def enrich(self, result):
updates = []
attribute_updates = []

current_target = targets_collection.find_one(
{"address": address}, {"target": True, "sysUpTime": True}
)
if current_target is None:
logger.info(f"First time for {address}")
current_target = {"address": address}
else:
logger.info(f"Not first time for {address}")
current_target = get_current_target(address, targets_collection)

# TODO: Compare the ts field with the lastmodified time of record and only update if we are newer
check_restart(current_target, result["result"], targets_collection, address)
logger.info(f"After check_restart for {address}")
# First write back to DB new/changed data
Expand Down Expand Up @@ -136,46 +128,17 @@ def enrich(self, result):
upsert=True,
)
new_fields = []
for field_key, field_value in group_data["fields"].items():
field_key_hash = field_key.replace(".", "|")
field_value["name"] = field_key
cv = None
if current_attributes and field_key_hash in current_attributes.get(
"fields", {}
):
cv = current_attributes["fields"][field_key_hash]

# if new field_value is different than the previous one, update
if cv and cv != field_value:
# modifed
attribute_updates.append(
{"$set": {f"fields.{field_key_hash}": field_value}}
)

elif cv:
# unchanged
pass
else:
# new
new_fields.append({"$set": {f"fields.{field_key_hash}": field_value}})
if field_key in TRACKED_F:
updates.append(
{"$set": {f"state.{field_key.replace('.', '|')}": field_value}}
)

if len(updates) >= MONGO_UPDATE_BATCH_THRESHOLD:
targets_collection.update_one(
{"address": address}, updates, upsert=True
)
updates.clear()

if len(attribute_updates) >= MONGO_UPDATE_BATCH_THRESHOLD:
attributes_collection.update_one(
{"address": address, "group_key_hash": group_key_hash},
attribute_updates,
upsert=True,
)
attribute_updates.clear()
set_attribute_updates(
address,
attribute_updates,
attributes_collection,
current_attributes,
group_data,
group_key_hash,
new_fields,
targets_collection,
updates,
)
if new_fields:
attributes_bulk_write_operations.append(
UpdateOne(
Expand All @@ -186,15 +149,14 @@ def enrich(self, result):
)
new_fields.clear()

if updates:
targets_collection.update_one({"address": address}, updates, upsert=True)
updates.clear()
if attribute_updates:
attributes_collection.update_one(
{"address": address, "group_key_hash": group_key_hash},
attribute_updates,
)
attribute_updates.clear()
update_collections(
address,
attribute_updates,
attributes_collection,
group_key_hash,
targets_collection,
updates,
)

# Now add back any fields we need
if current_attributes:
Expand All @@ -203,6 +165,23 @@ def enrich(self, result):
if attribute_group_id in result["result"]:
snmp_object = result["result"][attribute_group_id]
enrich_metric_with_fields_from_db(snmp_object, fields)
bulk_write_attributes(attributes_bulk_write_operations, attributes_collection)
return result


def get_current_target(address, targets_collection):
current_target = targets_collection.find_one(
{"address": address}, {"target": True, "sysUpTime": True}
)
if current_target is None:
logger.info(f"First time for {address}")
current_target = {"address": address}
else:
logger.info(f"Not first time for {address}")
return current_target


def bulk_write_attributes(attributes_bulk_write_operations, attributes_collection):
if attributes_bulk_write_operations:
logger.debug("Start of bulk_write")
start = time.time()
Expand All @@ -214,7 +193,76 @@ def enrich(self, result):
f"ELAPSED TIME OF BULK: {end - start} for {len(attributes_bulk_write_operations)} operations"
)
logger.debug(f"result api: {bulk_result.bulk_api_result}")
return result


def update_collections(
address,
attribute_updates,
attributes_collection,
group_key_hash,
targets_collection,
updates,
):
if updates:
targets_collection.update_one({"address": address}, updates, upsert=True)
updates.clear()
if attribute_updates:
attributes_collection.update_one(
{"address": address, "group_key_hash": group_key_hash},
attribute_updates,
)
attribute_updates.clear()


def set_attribute_updates(
address,
attribute_updates,
attributes_collection,
current_attributes,
group_data,
group_key_hash,
new_fields,
targets_collection,
updates,
):
for field_key, field_value in group_data["fields"].items():
field_key_hash = field_key.replace(".", "|")
field_value["name"] = field_key
cv = None
if current_attributes and field_key_hash in current_attributes.get(
"fields", {}
):
cv = current_attributes["fields"][field_key_hash]

# if new field_value is different than the previous one, update
if cv and cv != field_value:
# modifed
attribute_updates.append(
{"$set": {f"fields.{field_key_hash}": field_value}}
)

elif cv:
# unchanged
pass
else:
# new
new_fields.append({"$set": {f"fields.{field_key_hash}": field_value}})
if field_key in TRACKED_F:
updates.append(
{"$set": {f"state.{field_key.replace('.', '|')}": field_value}}
)

if len(updates) >= MONGO_UPDATE_BATCH_THRESHOLD:
targets_collection.update_one({"address": address}, updates, upsert=True)
updates.clear()

if len(attribute_updates) >= MONGO_UPDATE_BATCH_THRESHOLD:
attributes_collection.update_one(
{"address": address, "group_key_hash": group_key_hash},
attribute_updates,
upsert=True,
)
attribute_updates.clear()


def enrich_metric_with_fields_from_db(snmp_object, fields_from_db):
Expand Down
52 changes: 44 additions & 8 deletions test/enrich/test_enrich.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from unittest import TestCase
from unittest.mock import patch
from unittest.mock import MagicMock, patch

from splunk_connect_for_snmp.enrich.tasks import (
enrich,
enrich_metric_with_fields_from_db,
get_current_target,
logger,
)

attributes = {
"id": "GROUP1",
"address": "192.168.0.1",
"address": "192.168.0.1:161",
"fields": {
"SNMPv2-MIB|sysDescr": {
"time": 1234,
Expand Down Expand Up @@ -36,7 +38,7 @@

attributes2 = {
"id": "GROUP2",
"address": "192.168.0.1",
"address": "192.168.0.1:161",
"fields": {
"UDP-MIB|extraAttr": {
"time": 1234,
Expand All @@ -49,7 +51,7 @@
}

input_dict = {
"address": "192.168.0.1",
"address": "192.168.0.1:161",
"result": {
"GROUP1": {
"fields": {
Expand Down Expand Up @@ -92,7 +94,7 @@

input_enrich = {
"time": 1676291976.2939305,
"address": "54.91.99.113",
"address": "54.91.99.113:161",
"result": {
"ENTITY-MIB::int=1": {
"metrics": {},
Expand Down Expand Up @@ -232,7 +234,7 @@ class TestEnrich(TestCase):
@patch("pymongo.collection.Collection.bulk_write")
@patch("splunk_connect_for_snmp.enrich.tasks.check_restart")
def test_enrich(self, m_check_restart, bulk_write, m_update_one, m_find_one):
current_target = {"address": "192.168.0.1"}
current_target = {"address": "192.168.0.1:161"}
m_find_one.side_effect = [current_target, True, attributes, attributes2, {}]

result = enrich(input_dict)
Expand Down Expand Up @@ -281,7 +283,7 @@ def test_enrich(self, m_check_restart, bulk_write, m_update_one, m_find_one):
result["result"]["GROUP2"]["fields"]["UDP-MIB.extraAttr"],
)

self.assertEqual("192.168.0.1", result["address"])
self.assertEqual("192.168.0.1:161", result["address"])

m_check_restart.assert_called()
bulk_write.assert_called()
Expand Down Expand Up @@ -323,7 +325,7 @@ def test_enrich_no_target(
)

bulk_write.assert_called()
self.assertEqual("192.168.0.1", result["address"])
self.assertEqual("192.168.0.1:161", result["address"])

def test_enrich_metric_with_fields_from_db(self):
additional_field = {
Expand Down Expand Up @@ -480,3 +482,37 @@ def test_enrich_metric_with_fields_no_metrics(self):
result = snmp_object.copy()
enrich_metric_with_fields_from_db(snmp_object, additional_field)
self.assertEqual(result, snmp_object)

def test_get_current_target(self):
address = "127.0.0.1:161"
targets_collection = MagicMock()
targets_collection.find_one.side_effect = [
address,
True,
attributes,
attributes2,
{},
]
with self.assertLogs(logger, level="INFO") as logs:
current_address = get_current_target(address, targets_collection)
self.assertEqual("127.0.0.1:161", current_address)
self.assertEqual(
[
"INFO:splunk_connect_for_snmp.enrich.tasks:Not first time for 127.0.0.1:161"
],
logs.output,
)

def test_get_current_target_empty_find(self):
address = "127.0.0.1:161"
targets_collection = MagicMock()
targets_collection.find_one.return_value = None
with self.assertLogs(logger, level="INFO") as logs:
current_address = get_current_target(address, targets_collection)
self.assertEqual({"address": "127.0.0.1:161"}, current_address)
self.assertEqual(
[
"INFO:splunk_connect_for_snmp.enrich.tasks:First time for 127.0.0.1:161"
],
logs.output,
)
Loading