Skip to content

Commit

Permalink
Merge pull request #16 from UKB-IT-Sec/15-validation-for-all-device-a…
Browse files Browse the repository at this point in the history
…ttributes-in-validationpy

import devices from csv into database, tesobjects in  ldap_objects
  • Loading branch information
weidenba authored Oct 21, 2024
2 parents 5aa97f6 + d5f1432 commit 7a72c6f
Show file tree
Hide file tree
Showing 8 changed files with 880 additions and 1 deletion.
36 changes: 36 additions & 0 deletions src/helper/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from nac.models import Device


class MacList:
def __init__(self):
self._mac_list = {}
self._initialized = False

def _get_or_create_mac_list(self):
if not self._initialized:
devices = Device.objects.all().values('id', 'appl_NAC_macAddressCAB', 'appl_NAC_macAddressAIR')
for device in devices:
self.update_mac_list(device)
self._initialized = True

def check_existing_mac(self, deviceObject):
self._get_or_create_mac_list()
air_macs = deviceObject.get("appl_NAC_macAddressAIR") or ""
cab_macs = deviceObject.get("appl_NAC_macAddressCAB") or ""

for mac in air_macs.split(",") + cab_macs.split(","):
if mac.strip() in self._mac_list:
return True, self._mac_list[mac]

return False, None

def update_mac_list(self, device):
device_id = device['id']
air_mac = device.get("appl_NAC_macAddressAIR") or ""
cab_macs = device.get("appl_NAC_macAddressCAB") or ""

if air_mac.strip():
self._mac_list[air_mac.strip()] = device_id

if cab_macs:
self._mac_list.update({mac.strip(): device_id for mac in cab_macs.split(',') if mac.strip()})
8 changes: 8 additions & 0 deletions src/helper/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,11 @@ def get_src_directory():

def get_config_directory():
return get_src_directory().parent / 'config'


def get_resources_directory():
return get_src_directory().parent / 'resources'


def get_existing_path(path):
return Path(path) if Path(path).exists() else None
248 changes: 248 additions & 0 deletions src/nac/management/commands/import_devices_from_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
import logging
from os import stat
from csv import DictReader, DictWriter
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from django.core.exceptions import ValidationError, ObjectDoesNotExist
from nac.models import Device, AuthorizationGroup, DeviceRoleProd, DeviceRoleInst
from nac.forms import DeviceForm
from helper.logging import setup_console_logger
from helper.filesystem import get_resources_directory, get_existing_path
from helper.database import MacList


DEFAULT_SOURCE_FILE = get_resources_directory() / 'ldapObjects.csv'
SAVE_FILE = get_resources_directory() / "invalid_devices.csv"


class Command(BaseCommand):

help = 'Import devices from CSV file'

def add_arguments(self, parser):
parser.add_argument(
'-f', '--csv_file',
default=DEFAULT_SOURCE_FILE,
help='use a specific csv file [src/ldapObjects.csv]'
)
parser.add_argument(
'-a', '--auth_group',
default='DefaultAG',
help='specify the Device Authorization Group'
)
parser.add_argument(
'-u', '--update',
action='store_true',
help='specify if existing Devices should be updated'
)

def handle(self, *args, **options):
setup_console_logger(options['verbosity'])
self.source_file = get_existing_path(options['csv_file'])
self.update = options['update']
self.mac_list = MacList()
if not self.source_file:
logging.error(
f"The path '{options['csv_file']}'does not exist.")
raise CommandError(
f"The path '{options['csv_file']}' does not exist.")
self.auth_group = self.check_valid_auth_group(options['auth_group'])
if not self.auth_group:
logging.error(
f"Invalid auth group '{options['auth_group']}'.")
raise CommandError(
f"Invalid auth group '{options['auth_group']}'.")

self.clear_invalid_devices_file()
self.read_csv()

def check_valid_auth_group(self, auth_group):
exists = AuthorizationGroup.objects.filter(name=auth_group).exists()
if not exists:
logging.error(
f"Authorization Group-Object: {auth_group} not in Database")
return auth_group if exists else None

def clear_invalid_devices_file(self):
try:
with open(SAVE_FILE, "w"):
logging.info(f"Removing all entries in {SAVE_FILE}")
except Exception as e:
logging.error(
f"Removing all entries in {SAVE_FILE} FAILED -> {e}"
)

def read_csv(self):
try:
with open(self.source_file, "r", newline="") as csvfile:
logging.info(f"Reading {self.source_file}")
reader = DictReader(csvfile, delimiter=";")
for row in reader:
self.handle_deviceObject(row)
logging.debug(f"Reading {self.source_file}: SUCCESSFUL")
except Exception as e:
logging.error(f"Reading {self.source_file}: FAILED -> {e}")

def handle_deviceObject(self, deviceObject):
try:
device = self.check_device(deviceObject)
if device:
self.add_device_to_db(device)
except ValidationError as e:
logging.error(f"Invalid Device -> {e}")
self.save_invalid_devices(deviceObject)
except Exception as e:
logging.error(f"Error: Handling device Object failed -> {e}")

def check_device(self, deviceObject):
logging.info(f"Checking validity of device "
f"{deviceObject.get('appl-NAC-Hostname')}")
try:
if deviceObject.get('objectClass') != 'appl-NAC-Device':
raise ValidationError(
f"Invalid Object-type! EXPECTED: appl-NAC-Device <->"
f" ACTUAL: {deviceObject.get('objectClass')}")
with transaction.atomic():
auth_group = AuthorizationGroup.objects.get(
name=self.auth_group
)
try:
deviceRoleProd = DeviceRoleProd.objects.get(
name=deviceObject.get("appl-NAC-DeviceRoleProd"))
except ObjectDoesNotExist:
raise ValidationError(
f"DeviceRoleProd: "
f"{deviceObject.get('appl-NAC-DeviceRoleProd')} "
f"not in Database")
try:
deviceRoleInst = DeviceRoleInst.objects.get(
name=deviceObject.get("appl-NAC-DeviceRoleInst"))
except ObjectDoesNotExist:
raise ValidationError(
f"DeviceRoleInst: "
f"{deviceObject.get('appl-NAC-DeviceRoleInst')} "
f"not in Database")
if deviceRoleProd not in auth_group.DeviceRoleProd.all():
raise ValidationError(
f"DeviceRoleProd: {deviceRoleProd} "
f"not in authorization group: {auth_group}")
elif deviceRoleInst not in auth_group.DeviceRoleInst.all():
raise ValidationError(
f"DeviceRoleInst: {deviceRoleInst} "
f"not in authorization group: {auth_group}")

device_data = {
"name": deviceObject.get("appl-NAC-Hostname"),
"authorization_group": auth_group,
"appl_NAC_DeviceRoleProd": deviceRoleProd,
"appl_NAC_DeviceRoleInst": deviceRoleInst,
"appl_NAC_FQDN": deviceObject.get("appl-NAC-FQDN"),
"appl_NAC_Hostname": deviceObject.get("appl-NAC-Hostname"),
"appl_NAC_Active": self.str_to_bool(
deviceObject.get("appl-NAC-Active")
),
"appl_NAC_ForceDot1X": self.str_to_bool(
deviceObject.get("appl-NAC-ForceDot1X")
),
"appl_NAC_Install": self.str_to_bool(
deviceObject.get("appl-NAC-Install")
),
"appl_NAC_AllowAccessCAB": self.str_to_bool(
deviceObject.get("appl-NAC-AllowAccessCAB")
),
"appl_NAC_AllowAccessAIR": self.str_to_bool(
deviceObject.get("appl-NAC-AllowAccessAIR")
),
"appl_NAC_AllowAccessVPN": self.str_to_bool(
deviceObject.get("appl-NAC-AllowAccessVPN")
),
"appl_NAC_AllowAccessCEL": self.str_to_bool(
deviceObject.get("appl-NAC-AllowAccessCEL")
),
"appl_NAC_macAddressAIR": deviceObject.get(
"appl-NAC-macAddressAIR"
),
"appl_NAC_macAddressCAB": deviceObject.get(
"appl-NAC-macAddressCAB"
),
"appl_NAC_Certificate": deviceObject.get(
"appl-NAC-Certificate"
),
"synchronized": self.str_to_bool(
deviceObject.get("synchronized")
)
}
device_form = DeviceForm(device_data)
if device_form.is_valid():
logging.debug(f"Device {device_data.get('name')} is valid")
exists, device_id = self.mac_list.check_existing_mac(device_form.cleaned_data)
if exists:
logging.debug(f"Device {deviceObject.get('appl-NAC-Hostname')} already exists")
if self.update:
logging.debug(f"Updating Device {deviceObject.get('appl-NAC-Hostname')}")
Device.objects.filter(id=device_id).update(**device_form.cleaned_data)
return None
else:
raise ValidationError(f"Device {deviceObject.get('appl-NAC-Hostname')} exists and will not get updated")
return device_form.cleaned_data
else:
logging.error(
f"Device {device_data.get('name')} is not valid"
)
for field, errors in device_form.errors.items():
for reason in errors:
logging.error(f"Field: {field} - Error: {reason}")
print(f"Field: {field} - Error: {reason}")
raise ValidationError("Invalid Device")
except ValidationError:
raise
except Exception as e:
logging.error(
f"Checking validity of device {deviceObject.get('name')}: "
f"FAILED -> {e}"
)
raise

def str_to_bool(self, input_value):
return not (input_value in {False, 'False', 'false', 'FALSE'})

def add_device_to_db(self, deviceObject_valid):
logging.info(
f"Import device {deviceObject_valid.get('name')} to database"
)
try:
with transaction.atomic():
new_device = Device.objects.create(**deviceObject_valid)
self.mac_list.update_mac_list(new_device.__dict__)
logging.debug(
f"Import device {deviceObject_valid.get('name')} to "
f"database: SUCCESSFUL"
)
return True
except Exception as e:
logging.error(
f"Import device {deviceObject_valid.get('name')} to database: "
f"FAILED -> {e}"
)
return False

def save_invalid_devices(self, deviceObject_invalid):
try:
column_header = deviceObject_invalid.keys()
with open(SAVE_FILE, 'a', newline="") as csvfile:
logging.info(f"Writing invalid device to {SAVE_FILE}")
writer = DictWriter(
csvfile, fieldnames=column_header, delimiter=";"
)
if stat(SAVE_FILE).st_size == 0:
writer.writeheader()
writer.writerows([deviceObject_invalid])
logging.debug(
f"Writing invalid device to {SAVE_FILE}: "
f"SUCCESSFUL"
)
except Exception as e:
logging.error(
f"Writing invalid device to "
f"{SAVE_FILE}: FAILED -> {e}"
)
1 change: 1 addition & 0 deletions src/nac/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def get_absolute_url(self):
return reverse("device_detail", kwargs={"pk": self.pk})

def format_mac(self, mac):
mac = mac.upper()
return ":".join(mac[i:i+2] for i in range(0, 12, 2))

def get_appl_NAC_macAddressAIR(self):
Expand Down
Empty file.
Loading

0 comments on commit 7a72c6f

Please sign in to comment.