Skip to content

Commit

Permalink
Moved some functions to IS12Utils
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-r-thorpe committed Feb 14, 2024
1 parent fd8c9d3 commit 95fe9e5
Show file tree
Hide file tree
Showing 3 changed files with 455 additions and 639 deletions.
272 changes: 261 additions & 11 deletions nmostesting/IS12Utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .NMOSUtils import NMOSUtils

import json
import os
import time

from copy import deepcopy
Expand All @@ -25,6 +26,11 @@
from .GenericTest import NMOSTestException
from .TestHelper import WebsocketWorker, load_resolved_schema

NODE_API_KEY = "node"
CONTROL_API_KEY = "ncp"
MS05_API_KEY = "controlframework"
FEATURE_SETS_KEY = "featuresets"


class MessageTypes(IntEnum):
Command = 0
Expand Down Expand Up @@ -146,17 +152,21 @@ class StandardClassIds(Enum):


class IS12Utils(NMOSUtils):
def __init__(self, url, spec_path, spec_branch):
NMOSUtils.__init__(self, url)
self.spec_branch = spec_branch
self.load_is12_schemas(spec_path)
def __init__(self, apis):
NMOSUtils.__init__(self, apis[NODE_API_KEY]["url"])
self.apis = apis
self.spec_path = self.apis[CONTROL_API_KEY]["spec_path"]
self.spec_branch = self.apis[CONTROL_API_KEY]["spec_branch"]
self._load_is12_schemas()
self.ROOT_BLOCK_OID = 1
self.ncp_websocket = None
self.command_handle = 0
self.expect_notifications = False
self.notifications = []
self.device_model = None
self.class_manager = None

def load_is12_schemas(self, spec_path):
def _load_is12_schemas(self):
"""Load datatype and control class decriptors and create datatype JSON schemas"""
# Load IS-12 schemas
self.schemas = {}
Expand All @@ -165,16 +175,17 @@ def load_is12_schemas(self, spec_path):
"subscription-response-message",
"notification-message"]
for schema_name in schema_names:
self.schemas[schema_name] = load_resolved_schema(spec_path, schema_name + ".json")
self.schemas[schema_name] = load_resolved_schema(self.apis[CONTROL_API_KEY]["spec_path"],
schema_name + ".json")

def open_ncp_websocket(self, test, url):
def open_ncp_websocket(self, test):
"""Create a WebSocket client connection to Node under test. Raises NMOSTestException on error"""
# Reuse socket if connection already established
if self.ncp_websocket and self.ncp_websocket.is_open():
return

# Create a WebSocket connection to NMOS Control Protocol endpoint
self.ncp_websocket = WebsocketWorker(url)
self.ncp_websocket = WebsocketWorker(self.apis[CONTROL_API_KEY]["url"])
self.ncp_websocket.start()

# Give WebSocket client a chance to start and open its connection
Expand All @@ -194,7 +205,26 @@ def close_ncp_websocket(self):
if self.ncp_websocket:
self.ncp_websocket.close()

def validate_is12_schema(self, test, payload, schema_name, context=""):
def validate_reference_datatype_schema(self, test, payload, datatype_name, context=""):
"""Validate payload against reference datatype schema"""
self.validate_schema(test, payload, self.reference_datatype_schemas[datatype_name])

def validate_schema(self, test, payload, schema, context=""):
"""Delegates to validate_schema. Raises NMOSTestExceptions on error"""
if not schema:
raise NMOSTestException(test.FAIL(context + "Missing schema. "))
try:
# Validate the JSON schema is correct
checker = FormatChecker(["ipv4", "ipv6", "uri"])
validate(payload, schema, format_checker=checker)
except ValidationError as e:
raise NMOSTestException(test.FAIL(context + "Schema validation error: " + e.message))
except SchemaError as e:
raise NMOSTestException(test.FAIL(context + "Schema error: " + e.message))

return

def _validate_is12_schema(self, test, payload, schema_name, context=""):
"""Delegates to validate_schema. Raises NMOSTestExceptions on error"""
try:
# Validate the JSON schema is correct
Expand Down Expand Up @@ -245,7 +275,7 @@ def send_command(self, test, command_json):
parsed_message = json.loads(message)

if self.message_type_to_schema_name(parsed_message.get("messageType")):
self.validate_is12_schema(
self._validate_is12_schema(
test,
parsed_message,
self.message_type_to_schema_name(parsed_message["messageType"]),
Expand Down Expand Up @@ -324,7 +354,7 @@ def execute_command(self, test, oid, method_id, arguments):
return response["result"]

def get_property_value(self, test, oid, property_id):
"""Get property from object. Raises NMOSTestException on error"""
"""Get value of property from object. Raises NMOSTestException on error"""
return self.execute_command(test, oid,
NcObjectMethods.GENERIC_GET.value,
{'id': property_id})["value"]
Expand Down Expand Up @@ -542,6 +572,226 @@ def is_manager(self, class_id):
""" Check class id to determine if this is a manager """
return len(class_id) > 1 and class_id[0] == 1 and class_id[1] == 3

def load_reference_resources(self):
"""Load datatype and control class decriptors and create datatype JSON schemas"""
# Calculate paths to MS-05 descriptors
# including Feature Sets specified as additional_paths in test definition
spec_paths = [os.path.join(self.apis[FEATURE_SETS_KEY]["spec_path"], path)
for path in self.apis[FEATURE_SETS_KEY]["repo_paths"]]
spec_paths.append(self.apis[MS05_API_KEY]["spec_path"])
# Root path for primitive datatypes
spec_paths.append('test_data/IS1201')

datatype_paths = []
classes_paths = []
for spec_path in spec_paths:
datatype_path = os.path.abspath(os.path.join(spec_path, 'models/datatypes/'))
if os.path.exists(datatype_path):
datatype_paths.append(datatype_path)
classes_path = os.path.abspath(os.path.join(spec_path, 'models/classes/'))
if os.path.exists(classes_path):
classes_paths.append(classes_path)

# Load class and datatype descriptors
self.reference_class_descriptors = self._load_model_descriptors(classes_paths)

# Load MS-05 datatype descriptors
self.reference_datatype_descriptors = self._load_model_descriptors(datatype_paths)

# Generate MS-05 datatype schemas from MS-05 datatype descriptors
self.reference_datatype_schemas = self.generate_json_schemas(
datatype_descriptors=self.reference_datatype_descriptors,
schema_path=os.path.join(self.apis[CONTROL_API_KEY]["spec_path"], 'APIs/schemas/'))

def _load_model_descriptors(self, descriptor_paths):
descriptors = {}
for descriptor_path in descriptor_paths:
for filename in os.listdir(descriptor_path):
name, extension = os.path.splitext(filename)
if extension == ".json":
with open(os.path.join(descriptor_path, filename), 'r') as json_file:
descriptors[name] = json.load(json_file)

return descriptors

def generate_json_schemas(self, datatype_descriptors, schema_path):
"""Generate datatype schemas from datatype descriptors"""
datatype_schema_names = []
base_schema_path = os.path.abspath(schema_path)
if not os.path.exists(base_schema_path):
os.makedirs(base_schema_path)

for name, descriptor in datatype_descriptors.items():
json_schema = self.descriptor_to_schema(descriptor)
with open(os.path.join(base_schema_path, name + '.json'), 'w') as output_file:
json.dump(json_schema, output_file, indent=4)
datatype_schema_names.append(name)

# Load resolved MS-05 datatype schemas
datatype_schemas = {}
for name in datatype_schema_names:
datatype_schemas[name] = load_resolved_schema(schema_path, name + '.json', path_prefix=False)

return datatype_schemas

def validate_descriptor(self, test, reference, descriptor, context=""):
"""Validate descriptor against reference descriptor. Raises NMOSTestException on error"""
non_normative_keys = ['description']

if isinstance(reference, dict):
reference_keys = set(reference.keys())
descriptor_keys = set(descriptor.keys())

# compare the keys to see if any extra/missing
key_diff = (set(reference_keys) | set(descriptor_keys)) - (set(reference_keys) & set(descriptor_keys))
if len(key_diff) > 0:
error_description = "Missing keys " if set(key_diff) <= set(reference_keys) else "Additional keys "
raise NMOSTestException(test.FAIL(context + error_description + str(key_diff)))
for key in reference_keys:
if key in non_normative_keys and not isinstance(reference[key], dict):
continue
# Check for class ID
if key == 'classId' and isinstance(reference[key], list):
if reference[key] != descriptor[key]:
raise NMOSTestException(test.FAIL(context + "Unexpected ClassId. Expected: "
+ str(reference[key])
+ " actual: " + str(descriptor[key])))
else:
self.validate_descriptor(test, reference[key], descriptor[key], context=context + key + "->")
elif isinstance(reference, list):
if len(reference) > 0 and isinstance(reference[0], dict):
# Convert to dict and validate
references = {item['name']: item for item in reference}
descriptors = {item['name']: item for item in descriptor}

self.validate_descriptor(test, references, descriptors, context)
elif reference != descriptor:
raise NMOSTestException(test.FAIL(context + "Unexpected sequence. Expected: "
+ str(reference)
+ " actual: " + str(descriptor)))
else:
if reference != descriptor:
raise NMOSTestException(test.FAIL(context + 'Expected value: '
+ str(reference)
+ ', actual value: '
+ str(descriptor)))
return

def _get_class_manager_descriptors(self, test, class_manager_oid, property_id):
response = self.get_property_value(test, class_manager_oid, property_id)

if not response:
return None

# Create descriptor dictionary from response array
# Use classId as key if present, otherwise use name
def key_lambda(classId, name): return ".".join(map(str, classId)) if classId else name
descriptors = {key_lambda(r.get('classId'), r['name']): r for r in response}

return descriptors

def query_device_model(self, test):
""" Query Device Model from the Node under test.
self.device_model_metadata set on Device Model validation error.
NMOSTestException raised if unable to query Device Model """
self.open_ncp_websocket(test)
if not self.device_model:
self.device_model = self._nc_object_factory(
test,
StandardClassIds.NCBLOCK.value,
self.ROOT_BLOCK_OID,
"root")

if not self.device_model:
raise NMOSTestException(test.FAIL("Unable to query Device Model"))
return self.device_model

def get_class_manager(self, test):
"""Get the Class Manager queried from the Node under test's Device Model"""
if not self.class_manager:
self.class_manager = self._get_manager(test, StandardClassIds.NCCLASSMANAGER.value)

return self.class_manager

def get_device_manager(self, test):
"""Get the Device Manager queried from the Node under test's Device Model"""
return self._get_manager(test, StandardClassIds.NCDEVICEMANAGER.value)

def _get_manager(self, test, class_id):
self.open_ncp_websocket(test)
device_model = self.query_device_model(test)
members = device_model.find_members_by_class_id(class_id, include_derived=True)

spec_link = "https://specs.amwa.tv/ms-05-02/branches/{}/docs/Managers.html".format(self.spec_branch)

if len(members) == 0:
raise NMOSTestException(test.FAIL("Manager not found in Root Block.", spec_link))

if len(members) > 1:
raise NMOSTestException(test.FAIL("Manager MUST be a singleton.", spec_link))

return members[0]

def _nc_object_factory(self, test, class_id, oid, role):
"""Create NcObject or NcBlock based on class_id"""
# will set self.device_model_error to True if problems encountered
try:
runtime_constraints = self.get_property_value(
test,
oid,
NcObjectProperties.RUNTIME_PROPERTY_CONSTRAINTS.value)

# Check class id to determine if this is a block
if len(class_id) > 1 and class_id[0] == 1 and class_id[1] == 1:
member_descriptors = self.get_property_value(
test,
oid,
NcBlockProperties.MEMBERS.value)

nc_block = NcBlock(class_id, oid, role, member_descriptors, runtime_constraints)

for m in member_descriptors:
child_object = self._nc_object_factory(test, m["classId"], m["oid"], m["role"])
if child_object:
nc_block.add_child_object(child_object)

return nc_block
else:
# Check to determine if this is a Class Manager
if len(class_id) > 2 and class_id[0] == 1 and class_id[1] == 3 and class_id[2] == 2:
class_descriptors = self._get_class_manager_descriptors(
test,
oid,
NcClassManagerProperties.CONTROL_CLASSES.value)

datatype_descriptors = self._get_class_manager_descriptors(
test,
oid,
NcClassManagerProperties.DATATYPES.value)

if not class_descriptors or not datatype_descriptors:
# An error has likely occured
return None

return NcClassManager(class_id,
oid,
role,
class_descriptors,
datatype_descriptors,
runtime_constraints)

return NcObject(class_id, oid, role, runtime_constraints)

except NMOSTestException as e:
raise NMOSTestException(test.FAIL("Error in Device Model " + role + ": " + str(e.args[0].detail)))

def resolve_datatype(self, test, datatype):
"""Resolve datatype to its base type"""
class_manager = self.get_class_manager(test)
if class_manager.datatype_descriptors[datatype].get("parentType"):
return self.resolve_datatype(test, class_manager.datatype_descriptors[datatype].get("parentType"))
return datatype


class NcObject():
def __init__(self, class_id, oid, role, runtime_constraints):
Expand Down
Loading

0 comments on commit 95fe9e5

Please sign in to comment.