Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

import devices from csv into database, tesobjects in ldap_objects -> not final #16

Merged
merged 33 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d9a16cd
import devices from csv into database, tesobjects in ldap_objects ->…
Dan1elRoos Aug 7, 2024
485c74a
Update test_device_page.py
Dan1elRoos Aug 8, 2024
48b649c
adding setup and config files
Dan1elRoos Aug 14, 2024
2d84b37
added helper-functions, changed Modulname import for test_device_page
Dan1elRoos Aug 22, 2024
d3147a9
Merge main to current local branch
Dan1elRoos Aug 22, 2024
e41df1b
Fix Whitespace issues
Dan1elRoos Aug 22, 2024
a31924f
Merge branch '15-validation-for-all-device-attributes-in-validationpy…
Dan1elRoos Aug 22, 2024
931a5a0
Revert "import devices from csv into database, tesobjects in ldap_ob…
Dan1elRoos Aug 22, 2024
2eb9c55
Fixing static Testvariables, minor changes for CommandClass
Dan1elRoos Aug 23, 2024
382f906
fixed flake8, testcoverage at 100
Dan1elRoos Aug 23, 2024
5c3f4f9
fixing inconsistent code
Dan1elRoos Aug 23, 2024
1a1b285
fixed syntaxerror for django ci checks
Dan1elRoos Aug 23, 2024
c98df61
fixed minor error
Dan1elRoos Aug 25, 2024
cda3dc4
fix minor errors, improve compatibility
Dan1elRoos Sep 4, 2024
867ef99
rename
Dan1elRoos Sep 4, 2024
1fb125d
remove unnecessary test-resources
Dan1elRoos Sep 4, 2024
efa6fbb
Merge branch '19-deviceroleprod-equals-security-group' into 15-valida…
Dan1elRoos Sep 6, 2024
d83a952
adjust import script to DeviceRoleProd, parameterize area attribute
Dan1elRoos Sep 6, 2024
8a7deec
Merge branch 'main' into 15-validation-for-all-device-attributes-in-v…
weidenba Sep 10, 2024
49e7fdd
Merge branch '21-app-nac-deviceroleinst-selector' into 15-validation-…
Dan1elRoos Sep 11, 2024
d4de23c
Merge branch '21-app-nac-deviceroleinst-selector' into 15-validation-…
Dan1elRoos Sep 11, 2024
a4e2611
adjust device attributes, add attribute validation
Dan1elRoos Sep 12, 2024
423d9d9
Merge branch 'main' of https://github.com/UKB-IT-Sec/NAC-Service-Port…
Dan1elRoos Sep 12, 2024
dec7fc6
fix flake8
Dan1elRoos Sep 12, 2024
4251615
Merge branch 'main' into 15-validation-for-all-device-attributes-in-v…
weidenba Sep 16, 2024
3403609
apply requested changes
Dan1elRoos Sep 25, 2024
a58eae7
fix errors
Dan1elRoos Oct 1, 2024
ba8c5ca
fix flake8
Dan1elRoos Oct 2, 2024
2b7f94a
add device existence check with update option
Dan1elRoos Oct 8, 2024
44de2af
add database check for macAdresses
Dan1elRoos Oct 18, 2024
8a6b40c
add pytest for helper database
Dan1elRoos Oct 18, 2024
f37a0b0
create Class for macList in helper.database
Dan1elRoos Oct 21, 2024
d5f1432
Merge branch 'main' into 15-validation-for-all-device-attributes-in-v…
weidenba Oct 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/helper/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from nac.models import Device


class MacList:
def __init__(self):
self._mac_list = {}
self._initialized = False

def _get_or_create_mac_list(self):
if not self._initialized:
devices = Device.objects.all().values('id', 'appl_NAC_macAddressCAB', 'appl_NAC_macAddressAIR')
for device in devices:
self.update_mac_list(device)
self._initialized = True

def check_existing_mac(self, deviceObject):
self._get_or_create_mac_list()
air_macs = deviceObject.get("appl_NAC_macAddressAIR") or ""
cab_macs = deviceObject.get("appl_NAC_macAddressCAB") or ""

for mac in air_macs.split(",") + cab_macs.split(","):
if mac.strip() in self._mac_list:
return True, self._mac_list[mac]

return False, None

def update_mac_list(self, device):
device_id = device['id']
air_mac = device.get("appl_NAC_macAddressAIR") or ""
cab_macs = device.get("appl_NAC_macAddressCAB") or ""

if air_mac.strip():
self._mac_list[air_mac.strip()] = device_id

if cab_macs:
self._mac_list.update({mac.strip(): device_id for mac in cab_macs.split(',') if mac.strip()})
8 changes: 8 additions & 0 deletions src/helper/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,11 @@ def get_src_directory():

def get_config_directory():
return get_src_directory().parent / 'config'


def get_resources_directory():
return get_src_directory().parent / 'resources'


def get_existing_path(path):
return Path(path) if Path(path).exists() else None
248 changes: 248 additions & 0 deletions src/nac/management/commands/import_devices_from_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
import logging
from os import stat
from csv import DictReader, DictWriter
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from django.core.exceptions import ValidationError, ObjectDoesNotExist
from nac.models import Device, AuthorizationGroup, DeviceRoleProd, DeviceRoleInst
from nac.forms import DeviceForm
from helper.logging import setup_console_logger
from helper.filesystem import get_resources_directory, get_existing_path
from helper.database import MacList


DEFAULT_SOURCE_FILE = get_resources_directory() / 'ldapObjects.csv'
SAVE_FILE = get_resources_directory() / "invalid_devices.csv"


class Command(BaseCommand):

help = 'Import devices from CSV file'

def add_arguments(self, parser):
parser.add_argument(
'-f', '--csv_file',
default=DEFAULT_SOURCE_FILE,
help='use a specific csv file [src/ldapObjects.csv]'
)
parser.add_argument(
'-a', '--auth_group',
default='DefaultAG',
help='specify the Device Authorization Group'
)
parser.add_argument(
'-u', '--update',
action='store_true',
help='specify if existing Devices should be updated'
)

def handle(self, *args, **options):
setup_console_logger(options['verbosity'])
self.source_file = get_existing_path(options['csv_file'])
self.update = options['update']
self.mac_list = MacList()
if not self.source_file:
logging.error(
f"The path '{options['csv_file']}'does not exist.")
raise CommandError(
f"The path '{options['csv_file']}' does not exist.")
self.auth_group = self.check_valid_auth_group(options['auth_group'])
if not self.auth_group:
logging.error(
f"Invalid auth group '{options['auth_group']}'.")
raise CommandError(
f"Invalid auth group '{options['auth_group']}'.")

self.clear_invalid_devices_file()
self.read_csv()

def check_valid_auth_group(self, auth_group):
exists = AuthorizationGroup.objects.filter(name=auth_group).exists()
if not exists:
logging.error(
f"Authorization Group-Object: {auth_group} not in Database")
return auth_group if exists else None

def clear_invalid_devices_file(self):
try:
with open(SAVE_FILE, "w"):
logging.info(f"Removing all entries in {SAVE_FILE}")
except Exception as e:
logging.error(
f"Removing all entries in {SAVE_FILE} FAILED -> {e}"
)

def read_csv(self):
try:
with open(self.source_file, "r", newline="") as csvfile:
logging.info(f"Reading {self.source_file}")
reader = DictReader(csvfile, delimiter=";")
for row in reader:
self.handle_deviceObject(row)
logging.debug(f"Reading {self.source_file}: SUCCESSFUL")
except Exception as e:
logging.error(f"Reading {self.source_file}: FAILED -> {e}")

def handle_deviceObject(self, deviceObject):
try:
device = self.check_device(deviceObject)
if device:
self.add_device_to_db(device)
except ValidationError as e:
logging.error(f"Invalid Device -> {e}")
self.save_invalid_devices(deviceObject)
except Exception as e:
logging.error(f"Error: Handling device Object failed -> {e}")

def check_device(self, deviceObject):
logging.info(f"Checking validity of device "
f"{deviceObject.get('appl-NAC-Hostname')}")
try:
if deviceObject.get('objectClass') != 'appl-NAC-Device':
raise ValidationError(
f"Invalid Object-type! EXPECTED: appl-NAC-Device <->"
f" ACTUAL: {deviceObject.get('objectClass')}")
with transaction.atomic():
auth_group = AuthorizationGroup.objects.get(
name=self.auth_group
)
try:
deviceRoleProd = DeviceRoleProd.objects.get(
name=deviceObject.get("appl-NAC-DeviceRoleProd"))
except ObjectDoesNotExist:
raise ValidationError(
f"DeviceRoleProd: "
f"{deviceObject.get('appl-NAC-DeviceRoleProd')} "
f"not in Database")
try:
deviceRoleInst = DeviceRoleInst.objects.get(
name=deviceObject.get("appl-NAC-DeviceRoleInst"))
except ObjectDoesNotExist:
raise ValidationError(
f"DeviceRoleInst: "
f"{deviceObject.get('appl-NAC-DeviceRoleInst')} "
f"not in Database")
if deviceRoleProd not in auth_group.DeviceRoleProd.all():
raise ValidationError(
f"DeviceRoleProd: {deviceRoleProd} "
f"not in authorization group: {auth_group}")
elif deviceRoleInst not in auth_group.DeviceRoleInst.all():
raise ValidationError(
f"DeviceRoleInst: {deviceRoleInst} "
f"not in authorization group: {auth_group}")

device_data = {
"name": deviceObject.get("appl-NAC-Hostname"),
"authorization_group": auth_group,
"appl_NAC_DeviceRoleProd": deviceRoleProd,
"appl_NAC_DeviceRoleInst": deviceRoleInst,
"appl_NAC_FQDN": deviceObject.get("appl-NAC-FQDN"),
"appl_NAC_Hostname": deviceObject.get("appl-NAC-Hostname"),
"appl_NAC_Active": self.str_to_bool(
deviceObject.get("appl-NAC-Active")
),
"appl_NAC_ForceDot1X": self.str_to_bool(
deviceObject.get("appl-NAC-ForceDot1X")
),
"appl_NAC_Install": self.str_to_bool(
deviceObject.get("appl-NAC-Install")
),
"appl_NAC_AllowAccessCAB": self.str_to_bool(
deviceObject.get("appl-NAC-AllowAccessCAB")
),
"appl_NAC_AllowAccessAIR": self.str_to_bool(
deviceObject.get("appl-NAC-AllowAccessAIR")
),
"appl_NAC_AllowAccessVPN": self.str_to_bool(
deviceObject.get("appl-NAC-AllowAccessVPN")
),
"appl_NAC_AllowAccessCEL": self.str_to_bool(
deviceObject.get("appl-NAC-AllowAccessCEL")
),
"appl_NAC_macAddressAIR": deviceObject.get(
"appl-NAC-macAddressAIR"
),
"appl_NAC_macAddressCAB": deviceObject.get(
"appl-NAC-macAddressCAB"
),
"appl_NAC_Certificate": deviceObject.get(
"appl-NAC-Certificate"
),
"synchronized": self.str_to_bool(
deviceObject.get("synchronized")
)
}
device_form = DeviceForm(device_data)
if device_form.is_valid():
logging.debug(f"Device {device_data.get('name')} is valid")
exists, device_id = self.mac_list.check_existing_mac(device_form.cleaned_data)
if exists:
logging.debug(f"Device {deviceObject.get('appl-NAC-Hostname')} already exists")
if self.update:
logging.debug(f"Updating Device {deviceObject.get('appl-NAC-Hostname')}")
Device.objects.filter(id=device_id).update(**device_form.cleaned_data)
return None
else:
raise ValidationError(f"Device {deviceObject.get('appl-NAC-Hostname')} exists and will not get updated")
return device_form.cleaned_data
else:
logging.error(
f"Device {device_data.get('name')} is not valid"
)
for field, errors in device_form.errors.items():
for reason in errors:
logging.error(f"Field: {field} - Error: {reason}")
print(f"Field: {field} - Error: {reason}")
raise ValidationError("Invalid Device")
except ValidationError:
raise
except Exception as e:
logging.error(
f"Checking validity of device {deviceObject.get('name')}: "
f"FAILED -> {e}"
)
raise

def str_to_bool(self, input_value):
return not (input_value in {False, 'False', 'false', 'FALSE'})

def add_device_to_db(self, deviceObject_valid):
logging.info(
f"Import device {deviceObject_valid.get('name')} to database"
)
try:
with transaction.atomic():
new_device = Device.objects.create(**deviceObject_valid)
self.mac_list.update_mac_list(new_device.__dict__)
logging.debug(
f"Import device {deviceObject_valid.get('name')} to "
f"database: SUCCESSFUL"
)
return True
except Exception as e:
logging.error(
f"Import device {deviceObject_valid.get('name')} to database: "
f"FAILED -> {e}"
)
return False

def save_invalid_devices(self, deviceObject_invalid):
try:
column_header = deviceObject_invalid.keys()
with open(SAVE_FILE, 'a', newline="") as csvfile:
logging.info(f"Writing invalid device to {SAVE_FILE}")
writer = DictWriter(
csvfile, fieldnames=column_header, delimiter=";"
)
if stat(SAVE_FILE).st_size == 0:
writer.writeheader()
writer.writerows([deviceObject_invalid])
logging.debug(
f"Writing invalid device to {SAVE_FILE}: "
f"SUCCESSFUL"
)
except Exception as e:
logging.error(
f"Writing invalid device to "
f"{SAVE_FILE}: FAILED -> {e}"
)
1 change: 1 addition & 0 deletions src/nac/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def get_absolute_url(self):
return reverse("device_detail", kwargs={"pk": self.pk})

def format_mac(self, mac):
mac = mac.upper()
return ":".join(mac[i:i+2] for i in range(0, 12, 2))

def get_appl_NAC_macAddressAIR(self):
Expand Down
Empty file.
Loading
Loading