-
Notifications
You must be signed in to change notification settings - Fork 2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
242 additions
and
235 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,220 @@ | ||
"""Describe methods to utilize the AppProtect resources.""" | ||
|
||
from kubernetes.client import CustomObjectsApi, ApiextensionsV1Api, CoreV1Api | ||
from suite.resources_utils import ensure_item_removal, get_file_contents | ||
from kubernetes import client | ||
from kubernetes.client.rest import ApiException | ||
import pytest | ||
import time | ||
import yaml | ||
import logging | ||
|
||
|
||
def read_ap_custom_resource(custom_objects: CustomObjectsApi, namespace, plural, name) -> object: | ||
""" | ||
Get AppProtect CRD information (kubectl describe output) | ||
:param custom_objects: CustomObjectsApi | ||
:param namespace: The custom resource's namespace | ||
:param plural: the custom resource's plural name | ||
:param name: the custom object's name | ||
:return: object | ||
""" | ||
print(f"Getting info for {name} in namespace {namespace}") | ||
try: | ||
response = custom_objects.get_namespaced_custom_object( | ||
"appprotect.f5.com", "v1beta1", namespace, plural, name | ||
) | ||
return response | ||
|
||
except ApiException: | ||
logging.exception(f"Exception occurred while reading CRD") | ||
raise | ||
|
||
|
||
|
||
def create_ap_waf_policy_from_yaml( | ||
custom_objects: CustomObjectsApi, | ||
yaml_manifest, | ||
namespace, | ||
ap_namespace, | ||
waf_enable, | ||
log_enable, | ||
appolicy, | ||
aplogconf, | ||
logdest, | ||
) -> None: | ||
""" | ||
Create a Policy based on yaml file. | ||
:param custom_objects: CustomObjectsApi | ||
:param yaml_manifest: an absolute path to file | ||
:param namespace: namespace for test resources | ||
:param ap_namespace: namespace for AppProtect resources | ||
:param waf_enable: true/false | ||
:param log_enable: true/false | ||
:param appolicy: AppProtect policy name | ||
:param aplogconf: Logconf name | ||
:param logdest: AP log destination (syslog) | ||
:return: None | ||
""" | ||
with open(yaml_manifest) as f: | ||
dep = yaml.safe_load(f) | ||
try: | ||
dep["spec"]["waf"]["enable"] = waf_enable | ||
dep["spec"]["waf"]["apPolicy"] = f"{ap_namespace}/{appolicy}" | ||
dep["spec"]["waf"]["securityLog"]["enable"] = log_enable | ||
dep["spec"]["waf"]["securityLog"]["apLogConf"] = f"{ap_namespace}/{aplogconf}" | ||
dep["spec"]["waf"]["securityLog"]["logDest"] = f"{logdest}" | ||
|
||
custom_objects.create_namespaced_custom_object( | ||
"k8s.nginx.org", "v1", namespace, "policies", dep | ||
) | ||
print(f"Policy created: {dep}") | ||
except ApiException: | ||
logging.exception(f"Exception occurred while creating Policy: {dep['metadata']['name']}") | ||
raise | ||
|
||
def create_ap_logconf_from_yaml(custom_objects: CustomObjectsApi, yaml_manifest, namespace) -> str: | ||
""" | ||
Create a logconf for AppProtect based on yaml file. | ||
:param custom_objects: CustomObjectsApi | ||
:param yaml_manifest: an absolute path to file | ||
:param namespace: | ||
:return: str | ||
""" | ||
print("Create Ap logconf:") | ||
with open(yaml_manifest) as f: | ||
dep = yaml.safe_load(f) | ||
custom_objects.create_namespaced_custom_object( | ||
"appprotect.f5.com", "v1beta1", namespace, "aplogconfs", dep | ||
) | ||
print(f"AP logconf created with name '{dep['metadata']['name']}'") | ||
return dep["metadata"]["name"] | ||
|
||
|
||
def create_ap_policy_from_yaml(custom_objects: CustomObjectsApi, yaml_manifest, namespace) -> str: | ||
""" | ||
Create a policy for AppProtect based on yaml file. | ||
:param custom_objects: CustomObjectsApi | ||
:param yaml_manifest: an absolute path to file | ||
:param namespace: | ||
:return: str | ||
""" | ||
print("Create AP Policy:") | ||
with open(yaml_manifest) as f: | ||
dep = yaml.safe_load(f) | ||
custom_objects.create_namespaced_custom_object( | ||
"appprotect.f5.com", "v1beta1", namespace, "appolicies", dep | ||
) | ||
print(f"AP Policy created with name '{dep['metadata']['name']}'") | ||
return dep["metadata"]["name"] | ||
|
||
|
||
def create_ap_usersig_from_yaml(custom_objects: CustomObjectsApi, yaml_manifest, namespace) -> str: | ||
""" | ||
Create a UserSig for AppProtect based on yaml file. | ||
:param custom_objects: CustomObjectsApi | ||
:param yaml_manifest: an absolute path to file | ||
:param namespace: | ||
:return: str | ||
""" | ||
print("Create AP UserSig:") | ||
with open(yaml_manifest) as f: | ||
dep = yaml.safe_load(f) | ||
custom_objects.create_namespaced_custom_object( | ||
"appprotect.f5.com", "v1beta1", namespace, "apusersigs", dep | ||
) | ||
print(f"AP UserSig created with name '{dep['metadata']['name']}'") | ||
return dep["metadata"]["name"] | ||
|
||
|
||
def delete_and_create_ap_policy_from_yaml( | ||
custom_objects: CustomObjectsApi, name, yaml_manifest, namespace | ||
) -> None: | ||
""" | ||
Patch a AP Policy based on yaml manifest | ||
:param custom_objects: CustomObjectsApi | ||
:param name: | ||
:param yaml_manifest: an absolute path to file | ||
:param namespace: | ||
:return: | ||
""" | ||
print(f"Update an AP Policy: {name}") | ||
|
||
try: | ||
delete_ap_policy(custom_objects, name, namespace) | ||
create_ap_policy_from_yaml(custom_objects, yaml_manifest, namespace) | ||
except ApiException: | ||
logging.exception(f"Failed with exception while patching AP Policy: {name}") | ||
raise | ||
|
||
|
||
def delete_ap_usersig(custom_objects: CustomObjectsApi, name, namespace) -> None: | ||
""" | ||
Delete a AppProtect usersig. | ||
:param custom_objects: CustomObjectsApi | ||
:param namespace: namespace | ||
:param name: | ||
:return: | ||
""" | ||
print(f"Delete AP UserSig: {name}") | ||
custom_objects.delete_namespaced_custom_object( | ||
"appprotect.f5.com", "v1beta1", namespace, "apusersigs", name | ||
) | ||
ensure_item_removal( | ||
custom_objects.get_namespaced_custom_object, | ||
"appprotect.f5.com", | ||
"v1beta1", | ||
namespace, | ||
"apusersigs", | ||
name, | ||
) | ||
print(f"AP UserSig was removed with name: {name}") | ||
|
||
|
||
def delete_ap_logconf(custom_objects: CustomObjectsApi, name, namespace) -> None: | ||
""" | ||
Delete a AppProtect logconf. | ||
:param custom_objects: CustomObjectsApi | ||
:param namespace: namespace | ||
:param name: | ||
:return: | ||
""" | ||
print(f"Delete AP logconf: {name}") | ||
custom_objects.delete_namespaced_custom_object( | ||
"appprotect.f5.com", "v1beta1", namespace, "aplogconfs", name | ||
) | ||
ensure_item_removal( | ||
custom_objects.get_namespaced_custom_object, | ||
"appprotect.f5.com", | ||
"v1beta1", | ||
namespace, | ||
"aplogconfs", | ||
name, | ||
) | ||
print(f"AP logconf was removed with name: {name}") | ||
|
||
|
||
def delete_ap_policy(custom_objects: CustomObjectsApi, name, namespace) -> None: | ||
""" | ||
Delete a AppProtect policy. | ||
:param custom_objects: CustomObjectsApi | ||
:param namespace: namespace | ||
:param name: | ||
:return: | ||
""" | ||
print(f"Delete a AP policy: {name}") | ||
custom_objects.delete_namespaced_custom_object( | ||
"appprotect.f5.com", "v1beta1", namespace, "appolicies", name | ||
) | ||
ensure_item_removal( | ||
custom_objects.get_namespaced_custom_object, | ||
"appprotect.f5.com", | ||
"v1beta1", | ||
namespace, | ||
"appolicies", | ||
name, | ||
) | ||
time.sleep(3) | ||
print(f"AP policy was removed with name: {name}") | ||
|
Oops, something went wrong.