diff --git a/.github/workflows/ci-main.yaml b/.github/workflows/ci-main.yaml index 3314046c0..df5040f8d 100644 --- a/.github/workflows/ci-main.yaml +++ b/.github/workflows/ci-main.yaml @@ -140,6 +140,10 @@ jobs: steps: - name: Checkout Project uses: actions/checkout@v4 + - name: Setup python + uses: actions/setup-python@v5 + with: + python-version: "3.10" - name: run install_microk8s.sh run: | sudo snap install microk8s --classic --channel=1.30/stable @@ -162,6 +166,10 @@ jobs: steps: - name: Checkout Project uses: actions/checkout@v4 + - name: Setup python + uses: actions/setup-python@v5 + with: + python-version: "3.10" - name: Install docker compose run: | # Add Docker's official GPG key: diff --git a/poetry.lock b/poetry.lock index faa6830c4..638fdd744 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "amqp" @@ -2242,16 +2242,6 @@ files = [ {file = "wrapt-1.14.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8ad85f7f4e20964db4daadcab70b47ab05c7c1cf2a7c1e51087bfaa83831854c"}, {file = "wrapt-1.14.1-cp310-cp310-win32.whl", hash = "sha256:a9a52172be0b5aae932bef82a79ec0a0ce87288c7d132946d645eba03f0ad8a8"}, {file = "wrapt-1.14.1-cp310-cp310-win_amd64.whl", hash = "sha256:6d323e1554b3d22cfc03cd3243b5bb815a51f5249fdcbb86fda4bf62bab9e164"}, - {file = "wrapt-1.14.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:ecee4132c6cd2ce5308e21672015ddfed1ff975ad0ac8d27168ea82e71413f55"}, - {file = "wrapt-1.14.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2020f391008ef874c6d9e208b24f28e31bcb85ccff4f335f15a3251d222b92d9"}, - {file = "wrapt-1.14.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2feecf86e1f7a86517cab34ae6c2f081fd2d0dac860cb0c0ded96d799d20b335"}, - {file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:240b1686f38ae665d1b15475966fe0472f78e71b1b4903c143a842659c8e4cb9"}, - {file = "wrapt-1.14.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a9008dad07d71f68487c91e96579c8567c98ca4c3881b9b113bc7b33e9fd78b8"}, - {file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:6447e9f3ba72f8e2b985a1da758767698efa72723d5b59accefd716e9e8272bf"}, - {file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:acae32e13a4153809db37405f5eba5bac5fbe2e2ba61ab227926a22901051c0a"}, - {file = "wrapt-1.14.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:49ef582b7a1152ae2766557f0550a9fcbf7bbd76f43fbdc94dd3bf07cc7168be"}, - {file = "wrapt-1.14.1-cp311-cp311-win32.whl", hash = "sha256:358fe87cc899c6bb0ddc185bf3dbfa4ba646f05b1b0b9b5a27c2cb92c2cea204"}, - {file = "wrapt-1.14.1-cp311-cp311-win_amd64.whl", hash = "sha256:26046cd03936ae745a502abf44dac702a5e6880b2b01c29aea8ddf3353b68224"}, {file = "wrapt-1.14.1-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:43ca3bbbe97af00f49efb06e352eae40434ca9d915906f77def219b88e85d907"}, {file = "wrapt-1.14.1-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:6b1a564e6cb69922c7fe3a678b9f9a3c54e72b469875aa8018f18b4d1dd1adf3"}, {file = "wrapt-1.14.1-cp35-cp35m-manylinux2010_i686.whl", hash = "sha256:00b6d4ea20a906c0ca56d84f93065b398ab74b927a7a3dbd470f6fc503f95dc3"}, @@ -2316,5 +2306,5 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", [metadata] lock-version = "2.0" -python-versions = "^3.8" -content-hash = "0d96427eac02294dfb7ce6770111c88343202c9342ad3a9d807aeec52e7be425" +python-versions = ">=3.8,<3.12" +content-hash = "8fc9e4f9972418fec948be2960d7454911de3e854f7740f16d89e0f454f080df" diff --git a/pyproject.toml b/pyproject.toml index 94ce5f9a0..b7c7011f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ testpaths = ["test"] python_files = ["test_*.py"] [tool.poetry.dependencies] -python = "^3.8" +python = ">=3.8,<3.12" pymongo = {extras = ["srv"], version = "^4.0.0"} requests = {extras = ["crypto"], version = "^2.31.0"} celery = {extras = ["tblib"], version = "5.4.0"} diff --git a/splunk_connect_for_snmp/enrich/tasks.py b/splunk_connect_for_snmp/enrich/tasks.py index 0c5d03002..18b78bd62 100644 --- a/splunk_connect_for_snmp/enrich/tasks.py +++ b/splunk_connect_for_snmp/enrich/tasks.py @@ -96,16 +96,8 @@ def enrich(self, result): updates = [] attribute_updates = [] - current_target = targets_collection.find_one( - {"address": address}, {"target": True, "sysUpTime": True} - ) - if current_target is None: - logger.info(f"First time for {address}") - current_target = {"address": address} - else: - logger.info(f"Not first time for {address}") + current_target = get_current_target(address, targets_collection) - # TODO: Compare the ts field with the lastmodified time of record and only update if we are newer check_restart(current_target, result["result"], targets_collection, address) logger.info(f"After check_restart for {address}") # First write back to DB new/changed data @@ -136,46 +128,17 @@ def enrich(self, result): upsert=True, ) new_fields = [] - for field_key, field_value in group_data["fields"].items(): - field_key_hash = field_key.replace(".", "|") - field_value["name"] = field_key - cv = None - if current_attributes and field_key_hash in current_attributes.get( - "fields", {} - ): - cv = current_attributes["fields"][field_key_hash] - - # if new field_value is different than the previous one, update - if cv and cv != field_value: - # modifed - attribute_updates.append( - {"$set": {f"fields.{field_key_hash}": field_value}} - ) - - elif cv: - # unchanged - pass - else: - # new - new_fields.append({"$set": {f"fields.{field_key_hash}": field_value}}) - if field_key in TRACKED_F: - updates.append( - {"$set": {f"state.{field_key.replace('.', '|')}": field_value}} - ) - - if len(updates) >= MONGO_UPDATE_BATCH_THRESHOLD: - targets_collection.update_one( - {"address": address}, updates, upsert=True - ) - updates.clear() - - if len(attribute_updates) >= MONGO_UPDATE_BATCH_THRESHOLD: - attributes_collection.update_one( - {"address": address, "group_key_hash": group_key_hash}, - attribute_updates, - upsert=True, - ) - attribute_updates.clear() + set_attribute_updates( + address, + attribute_updates, + attributes_collection, + current_attributes, + group_data, + group_key_hash, + new_fields, + targets_collection, + updates, + ) if new_fields: attributes_bulk_write_operations.append( UpdateOne( @@ -186,15 +149,14 @@ def enrich(self, result): ) new_fields.clear() - if updates: - targets_collection.update_one({"address": address}, updates, upsert=True) - updates.clear() - if attribute_updates: - attributes_collection.update_one( - {"address": address, "group_key_hash": group_key_hash}, - attribute_updates, - ) - attribute_updates.clear() + update_collections( + address, + attribute_updates, + attributes_collection, + group_key_hash, + targets_collection, + updates, + ) # Now add back any fields we need if current_attributes: @@ -203,6 +165,23 @@ def enrich(self, result): if attribute_group_id in result["result"]: snmp_object = result["result"][attribute_group_id] enrich_metric_with_fields_from_db(snmp_object, fields) + bulk_write_attributes(attributes_bulk_write_operations, attributes_collection) + return result + + +def get_current_target(address, targets_collection): + current_target = targets_collection.find_one( + {"address": address}, {"target": True, "sysUpTime": True} + ) + if current_target is None: + logger.info(f"First time for {address}") + current_target = {"address": address} + else: + logger.info(f"Not first time for {address}") + return current_target + + +def bulk_write_attributes(attributes_bulk_write_operations, attributes_collection): if attributes_bulk_write_operations: logger.debug("Start of bulk_write") start = time.time() @@ -214,7 +193,76 @@ def enrich(self, result): f"ELAPSED TIME OF BULK: {end - start} for {len(attributes_bulk_write_operations)} operations" ) logger.debug(f"result api: {bulk_result.bulk_api_result}") - return result + + +def update_collections( + address, + attribute_updates, + attributes_collection, + group_key_hash, + targets_collection, + updates, +): + if updates: + targets_collection.update_one({"address": address}, updates, upsert=True) + updates.clear() + if attribute_updates: + attributes_collection.update_one( + {"address": address, "group_key_hash": group_key_hash}, + attribute_updates, + ) + attribute_updates.clear() + + +def set_attribute_updates( + address, + attribute_updates, + attributes_collection, + current_attributes, + group_data, + group_key_hash, + new_fields, + targets_collection, + updates, +): + for field_key, field_value in group_data["fields"].items(): + field_key_hash = field_key.replace(".", "|") + field_value["name"] = field_key + cv = None + if current_attributes and field_key_hash in current_attributes.get( + "fields", {} + ): + cv = current_attributes["fields"][field_key_hash] + + # if new field_value is different than the previous one, update + if cv and cv != field_value: + # modifed + attribute_updates.append( + {"$set": {f"fields.{field_key_hash}": field_value}} + ) + + elif cv: + # unchanged + pass + else: + # new + new_fields.append({"$set": {f"fields.{field_key_hash}": field_value}}) + if field_key in TRACKED_F: + updates.append( + {"$set": {f"state.{field_key.replace('.', '|')}": field_value}} + ) + + if len(updates) >= MONGO_UPDATE_BATCH_THRESHOLD: + targets_collection.update_one({"address": address}, updates, upsert=True) + updates.clear() + + if len(attribute_updates) >= MONGO_UPDATE_BATCH_THRESHOLD: + attributes_collection.update_one( + {"address": address, "group_key_hash": group_key_hash}, + attribute_updates, + upsert=True, + ) + attribute_updates.clear() def enrich_metric_with_fields_from_db(snmp_object, fields_from_db): diff --git a/test/enrich/test_enrich.py b/test/enrich/test_enrich.py index a86eb4c66..f512eb417 100644 --- a/test/enrich/test_enrich.py +++ b/test/enrich/test_enrich.py @@ -1,14 +1,16 @@ from unittest import TestCase -from unittest.mock import patch +from unittest.mock import MagicMock, patch from splunk_connect_for_snmp.enrich.tasks import ( enrich, enrich_metric_with_fields_from_db, + get_current_target, + logger, ) attributes = { "id": "GROUP1", - "address": "192.168.0.1", + "address": "192.168.0.1:161", "fields": { "SNMPv2-MIB|sysDescr": { "time": 1234, @@ -36,7 +38,7 @@ attributes2 = { "id": "GROUP2", - "address": "192.168.0.1", + "address": "192.168.0.1:161", "fields": { "UDP-MIB|extraAttr": { "time": 1234, @@ -49,7 +51,7 @@ } input_dict = { - "address": "192.168.0.1", + "address": "192.168.0.1:161", "result": { "GROUP1": { "fields": { @@ -92,7 +94,7 @@ input_enrich = { "time": 1676291976.2939305, - "address": "54.91.99.113", + "address": "54.91.99.113:161", "result": { "ENTITY-MIB::int=1": { "metrics": {}, @@ -232,7 +234,7 @@ class TestEnrich(TestCase): @patch("pymongo.collection.Collection.bulk_write") @patch("splunk_connect_for_snmp.enrich.tasks.check_restart") def test_enrich(self, m_check_restart, bulk_write, m_update_one, m_find_one): - current_target = {"address": "192.168.0.1"} + current_target = {"address": "192.168.0.1:161"} m_find_one.side_effect = [current_target, True, attributes, attributes2, {}] result = enrich(input_dict) @@ -281,7 +283,7 @@ def test_enrich(self, m_check_restart, bulk_write, m_update_one, m_find_one): result["result"]["GROUP2"]["fields"]["UDP-MIB.extraAttr"], ) - self.assertEqual("192.168.0.1", result["address"]) + self.assertEqual("192.168.0.1:161", result["address"]) m_check_restart.assert_called() bulk_write.assert_called() @@ -323,7 +325,7 @@ def test_enrich_no_target( ) bulk_write.assert_called() - self.assertEqual("192.168.0.1", result["address"]) + self.assertEqual("192.168.0.1:161", result["address"]) def test_enrich_metric_with_fields_from_db(self): additional_field = { @@ -480,3 +482,37 @@ def test_enrich_metric_with_fields_no_metrics(self): result = snmp_object.copy() enrich_metric_with_fields_from_db(snmp_object, additional_field) self.assertEqual(result, snmp_object) + + def test_get_current_target(self): + address = "127.0.0.1:161" + targets_collection = MagicMock() + targets_collection.find_one.side_effect = [ + address, + True, + attributes, + attributes2, + {}, + ] + with self.assertLogs(logger, level="INFO") as logs: + current_address = get_current_target(address, targets_collection) + self.assertEqual("127.0.0.1:161", current_address) + self.assertEqual( + [ + "INFO:splunk_connect_for_snmp.enrich.tasks:Not first time for 127.0.0.1:161" + ], + logs.output, + ) + + def test_get_current_target_empty_find(self): + address = "127.0.0.1:161" + targets_collection = MagicMock() + targets_collection.find_one.return_value = None + with self.assertLogs(logger, level="INFO") as logs: + current_address = get_current_target(address, targets_collection) + self.assertEqual({"address": "127.0.0.1:161"}, current_address) + self.assertEqual( + [ + "INFO:splunk_connect_for_snmp.enrich.tasks:First time for 127.0.0.1:161" + ], + logs.output, + )