Skip to content

Commit

Permalink
Allow to skip the security scan
Browse files Browse the repository at this point in the history
This patch allows users to skip the initial security scan when
registering an IP address with the firewall.
  • Loading branch information
lkiesow committed Nov 29, 2023
1 parent 08bf15c commit 1574ccf
Showing 1 changed file with 85 additions and 78 deletions.
163 changes: 85 additions & 78 deletions deterrersapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,126 +6,130 @@ class Deterrers:
__token = None

timeout = 30
'''Request timeout in seconds'''
"""Request timeout in seconds"""

def __init__(self, base_url: str, token: str) -> None:
'''Initialize DETERRERS client.
"""Initialize DETERRERS client.
:param base_url: URL of the DETERRERS installation without path
:type base_url: str
:param token: API token to use for authentication
:type token: str
'''
self.__base_url = base_url.rstrip('/')
"""
self.__base_url = base_url.rstrip("/")
self.__token = token

def __url(self, path: str) -> str:
'''Build request URL based on API path
"""Build request URL based on API path
:param path: API endpoint
:type path: str
:return: Full URL to API endpoint
:rtype: str
'''
path = path.lstrip('/')
return f'{self.__base_url}/hostadmin/api/{path}'
"""
path = path.lstrip("/")
return f"{self.__base_url}/hostadmin/api/{path}"

def __header(self) -> dict[str, str]:
'''Build authorization and content typüe header for request.
"""Build authorization and content typüe header for request.
:return: Header dict to use in request
:rtype: dict[str, str]
'''
return {'Authorization': f'Token {self.__token}',
'Content-Type': 'application/json'}
"""
return {
"Authorization": f"Token {self.__token}",
"Content-Type": "application/json",
}

def __get(self, path: str, **params):
'''Execute API GET request.
"""Execute API GET request.
:param path: Endpoint to request
:type path: str
:return: Decoded response data or None
'''
response = requests.get(self.__url(path),
headers=self.__header(),
params=params,
timeout=self.timeout)
"""
response = requests.get(
self.__url(path),
headers=self.__header(),
params=params,
timeout=self.timeout,
)
return response.json() if response.status_code == 200 else None

def __patch(self, path: str, data: dict) -> None:
'''Execute API PATCH request
"""Execute API PATCH request
:param path: Endpoint to request
:type path: str
:param data: Data to send
:type data: dict
:raises RuntimeError: if patch did not succeed
'''
"""
url: str = self.__url(path)
response = requests.patch(url,
headers=self.__header(),
json=data,
timeout=self.timeout)
response = requests.patch(
url, headers=self.__header(), json=data, timeout=self.timeout
)
if response.status_code not in [200]:
raise RuntimeError(f'Error updating {data}. Response: {response}')
raise RuntimeError(f"Error updating {data}. Response: {response}")

def __post(self, path: str, data: dict) -> None:
'''Execute API POST request
"""Execute API POST request
:param path: Endpoint to request
:type path: str
:param data: Data to post
:type data: dict
:raises RuntimeError: if adding data did not succeed
'''
"""
url: str = self.__url(path)
response = requests.post(url,
headers=self.__header(),
json=data,
timeout=self.timeout)
response = requests.post(
url, headers=self.__header(), json=data, timeout=self.timeout
)
if response.status_code not in [200]:
raise RuntimeError(f'Error adding {data}. Response: {response}')
raise RuntimeError(f"Error adding {data}. Response: {response}")

def __delete(self, path: str, data: dict) -> None:
'''Execute API DELETE request.
"""Execute API DELETE request.
:param path: Endpoint to request
:type path: str
:param data: Data identifying entity to delete
:type data: dict
:raises RuntimeError: if removal did not succeed
'''
"""
url: str = self.__url(path)
response = requests.delete(url,
headers=self.__header(),
json=data,
timeout=self.timeout)
response = requests.delete(
url, headers=self.__header(), json=data, timeout=self.timeout
)
if response.status_code not in [200, 404]:
raise RuntimeError(f'Error deleting {data}. Response: {response}')
raise RuntimeError(f"Error deleting {data}. Response: {response}")

def hosts(self) -> dict:
'''Get list of hosts added to DETERRERS
"""Get list of hosts added to DETERRERS
:return: List of hosts
:rtype: list
'''
return self.__get('hosts/')
"""
return self.__get("hosts/")

def get(self, ipv4: str):
'''Get information about an ipv4 address from DETERRERS.
"""Get information about an ipv4 address from DETERRERS.
:param ipv4: IPv4 address
:type ipv4: str
:return: Dictionary with information
:rtype: dict
'''
return self.__get('host/', ipv4_addr=ipv4)

def add(self, ipv4: str,
admins: list[str],
profile: None | str = None,
firewall: None | str = None) -> None:
'''Add a new IP address to DETERRERS.
"""
return self.__get("host/", ipv4_addr=ipv4)

def add(
self,
ipv4: str,
admins: list[str],
profile: None | str = None,
firewall: None | str = None,
) -> None:
"""Add a new IP address to DETERRERS.
:param ipv4: IPv4 address
:type ipv4: str
Expand All @@ -137,28 +141,30 @@ def add(self, ipv4: str,
:param firewall: Host firewall. Must be None, one of the UI options
or an empty string (default: None)
:type firewall: None | str
'''
data = {'ipv4_addr': ipv4,
'admin_ids': admins}
"""
data = {"ipv4_addr": ipv4, "admin_ids": admins}
if profile is not None:
data['service_profile'] = profile
data["service_profile"] = profile
if firewall is not None:
data['fw'] = firewall
return self.__post('host/', data)
data["fw"] = firewall
return self.__post("host/", data)

def delete(self, ipv4: str) -> None:
'''Delete IP from DETERRERS.
"""Delete IP from DETERRERS.
:param ipv4: IPv4 address to delete
:type ipv4: str
'''
return self.__delete('host/', {'ipv4_addr': ipv4})

def update(self, ipv4: str,
profile: None | str = None,
firewall: None | str = None,
admins: None | list[str] = None) -> None:
'''Update IP address information in DETERRERS.
"""
return self.__delete("host/", {"ipv4_addr": ipv4})

def update(
self,
ipv4: str,
profile: None | str = None,
firewall: None | str = None,
admins: None | list[str] = None,
) -> None:
"""Update IP address information in DETERRERS.
:param ipv4: IPv4 address to update
:type ipv4: str
Expand All @@ -170,24 +176,25 @@ def update(self, ipv4: str,
:type firewall: str | None
:param admins: List of admins for address
:type admins: list[str] | None
'''
data: dict[str, str | list[str]] = {'ipv4_addr': ipv4}
"""
data: dict[str, str | list[str]] = {"ipv4_addr": ipv4}
if profile is not None:
data['service_profile'] = profile
data["service_profile"] = profile
if firewall is not None:
data['fw'] = firewall
data["fw"] = firewall
if admins is not None:
data['admin_ids'] = admins
return self.__patch('host/', data)
data["admin_ids"] = admins
return self.__patch("host/", data)

def action(self, ipv4: str, action: str) -> None:
'''Activate firewall profile or block IP address in perimeter firewall.
def action(self, ipv4: str, action: str, skip_scan: bool = False) -> None:
"""Activate firewall profile or block IP address in perimeter firewall.
:param ipv4: IPv4 address to update
:type ipv4: str
:param action: Action to take. Can be 'register' or 'block'
:type action: str
'''
data = {'ipv4_addrs': [ipv4],
'action': action}
return self.__post('action/', data)
:param skip_scan: Whether to skip the initial security scan
:type skip_scan: bool
"""
data = {"ipv4_addrs": [ipv4], "action": action, "skip_scan": skip_scan}
return self.__post("action/", data)

0 comments on commit 1574ccf

Please sign in to comment.