diff --git a/nmostesting/IS12Utils.py b/nmostesting/IS12Utils.py index 6dd85348..90917026 100644 --- a/nmostesting/IS12Utils.py +++ b/nmostesting/IS12Utils.py @@ -15,6 +15,7 @@ from .NMOSUtils import NMOSUtils import json +import os import time from copy import deepcopy @@ -25,6 +26,11 @@ from .GenericTest import NMOSTestException from .TestHelper import WebsocketWorker, load_resolved_schema +NODE_API_KEY = "node" +CONTROL_API_KEY = "ncp" +MS05_API_KEY = "controlframework" +FEATURE_SETS_KEY = "featuresets" + class MessageTypes(IntEnum): Command = 0 @@ -146,17 +152,21 @@ class StandardClassIds(Enum): class IS12Utils(NMOSUtils): - def __init__(self, url, spec_path, spec_branch): - NMOSUtils.__init__(self, url) - self.spec_branch = spec_branch - self.load_is12_schemas(spec_path) + def __init__(self, apis): + NMOSUtils.__init__(self, apis[NODE_API_KEY]["url"]) + self.apis = apis + self.spec_path = self.apis[CONTROL_API_KEY]["spec_path"] + self.spec_branch = self.apis[CONTROL_API_KEY]["spec_branch"] + self._load_is12_schemas() self.ROOT_BLOCK_OID = 1 self.ncp_websocket = None self.command_handle = 0 self.expect_notifications = False self.notifications = [] + self.device_model = None + self.class_manager = None - def load_is12_schemas(self, spec_path): + def _load_is12_schemas(self): """Load datatype and control class decriptors and create datatype JSON schemas""" # Load IS-12 schemas self.schemas = {} @@ -165,16 +175,17 @@ def load_is12_schemas(self, spec_path): "subscription-response-message", "notification-message"] for schema_name in schema_names: - self.schemas[schema_name] = load_resolved_schema(spec_path, schema_name + ".json") + self.schemas[schema_name] = load_resolved_schema(self.apis[CONTROL_API_KEY]["spec_path"], + schema_name + ".json") - def open_ncp_websocket(self, test, url): + def open_ncp_websocket(self, test): """Create a WebSocket client connection to Node under test. Raises NMOSTestException on error""" # Reuse socket if connection already established if self.ncp_websocket and self.ncp_websocket.is_open(): return # Create a WebSocket connection to NMOS Control Protocol endpoint - self.ncp_websocket = WebsocketWorker(url) + self.ncp_websocket = WebsocketWorker(self.apis[CONTROL_API_KEY]["url"]) self.ncp_websocket.start() # Give WebSocket client a chance to start and open its connection @@ -194,7 +205,26 @@ def close_ncp_websocket(self): if self.ncp_websocket: self.ncp_websocket.close() - def validate_is12_schema(self, test, payload, schema_name, context=""): + def validate_reference_datatype_schema(self, test, payload, datatype_name, context=""): + """Validate payload against reference datatype schema""" + self.validate_schema(test, payload, self.reference_datatype_schemas[datatype_name]) + + def validate_schema(self, test, payload, schema, context=""): + """Delegates to validate_schema. Raises NMOSTestExceptions on error""" + if not schema: + raise NMOSTestException(test.FAIL(context + "Missing schema. ")) + try: + # Validate the JSON schema is correct + checker = FormatChecker(["ipv4", "ipv6", "uri"]) + validate(payload, schema, format_checker=checker) + except ValidationError as e: + raise NMOSTestException(test.FAIL(context + "Schema validation error: " + e.message)) + except SchemaError as e: + raise NMOSTestException(test.FAIL(context + "Schema error: " + e.message)) + + return + + def _validate_is12_schema(self, test, payload, schema_name, context=""): """Delegates to validate_schema. Raises NMOSTestExceptions on error""" try: # Validate the JSON schema is correct @@ -245,7 +275,7 @@ def send_command(self, test, command_json): parsed_message = json.loads(message) if self.message_type_to_schema_name(parsed_message.get("messageType")): - self.validate_is12_schema( + self._validate_is12_schema( test, parsed_message, self.message_type_to_schema_name(parsed_message["messageType"]), @@ -324,7 +354,7 @@ def execute_command(self, test, oid, method_id, arguments): return response["result"] def get_property_value(self, test, oid, property_id): - """Get property from object. Raises NMOSTestException on error""" + """Get value of property from object. Raises NMOSTestException on error""" return self.execute_command(test, oid, NcObjectMethods.GENERIC_GET.value, {'id': property_id})["value"] @@ -542,6 +572,226 @@ def is_manager(self, class_id): """ Check class id to determine if this is a manager """ return len(class_id) > 1 and class_id[0] == 1 and class_id[1] == 3 + def load_reference_resources(self): + """Load datatype and control class decriptors and create datatype JSON schemas""" + # Calculate paths to MS-05 descriptors + # including Feature Sets specified as additional_paths in test definition + spec_paths = [os.path.join(self.apis[FEATURE_SETS_KEY]["spec_path"], path) + for path in self.apis[FEATURE_SETS_KEY]["repo_paths"]] + spec_paths.append(self.apis[MS05_API_KEY]["spec_path"]) + # Root path for primitive datatypes + spec_paths.append('test_data/IS1201') + + datatype_paths = [] + classes_paths = [] + for spec_path in spec_paths: + datatype_path = os.path.abspath(os.path.join(spec_path, 'models/datatypes/')) + if os.path.exists(datatype_path): + datatype_paths.append(datatype_path) + classes_path = os.path.abspath(os.path.join(spec_path, 'models/classes/')) + if os.path.exists(classes_path): + classes_paths.append(classes_path) + + # Load class and datatype descriptors + self.reference_class_descriptors = self._load_model_descriptors(classes_paths) + + # Load MS-05 datatype descriptors + self.reference_datatype_descriptors = self._load_model_descriptors(datatype_paths) + + # Generate MS-05 datatype schemas from MS-05 datatype descriptors + self.reference_datatype_schemas = self.generate_json_schemas( + datatype_descriptors=self.reference_datatype_descriptors, + schema_path=os.path.join(self.apis[CONTROL_API_KEY]["spec_path"], 'APIs/schemas/')) + + def _load_model_descriptors(self, descriptor_paths): + descriptors = {} + for descriptor_path in descriptor_paths: + for filename in os.listdir(descriptor_path): + name, extension = os.path.splitext(filename) + if extension == ".json": + with open(os.path.join(descriptor_path, filename), 'r') as json_file: + descriptors[name] = json.load(json_file) + + return descriptors + + def generate_json_schemas(self, datatype_descriptors, schema_path): + """Generate datatype schemas from datatype descriptors""" + datatype_schema_names = [] + base_schema_path = os.path.abspath(schema_path) + if not os.path.exists(base_schema_path): + os.makedirs(base_schema_path) + + for name, descriptor in datatype_descriptors.items(): + json_schema = self.descriptor_to_schema(descriptor) + with open(os.path.join(base_schema_path, name + '.json'), 'w') as output_file: + json.dump(json_schema, output_file, indent=4) + datatype_schema_names.append(name) + + # Load resolved MS-05 datatype schemas + datatype_schemas = {} + for name in datatype_schema_names: + datatype_schemas[name] = load_resolved_schema(schema_path, name + '.json', path_prefix=False) + + return datatype_schemas + + def validate_descriptor(self, test, reference, descriptor, context=""): + """Validate descriptor against reference descriptor. Raises NMOSTestException on error""" + non_normative_keys = ['description'] + + if isinstance(reference, dict): + reference_keys = set(reference.keys()) + descriptor_keys = set(descriptor.keys()) + + # compare the keys to see if any extra/missing + key_diff = (set(reference_keys) | set(descriptor_keys)) - (set(reference_keys) & set(descriptor_keys)) + if len(key_diff) > 0: + error_description = "Missing keys " if set(key_diff) <= set(reference_keys) else "Additional keys " + raise NMOSTestException(test.FAIL(context + error_description + str(key_diff))) + for key in reference_keys: + if key in non_normative_keys and not isinstance(reference[key], dict): + continue + # Check for class ID + if key == 'classId' and isinstance(reference[key], list): + if reference[key] != descriptor[key]: + raise NMOSTestException(test.FAIL(context + "Unexpected ClassId. Expected: " + + str(reference[key]) + + " actual: " + str(descriptor[key]))) + else: + self.validate_descriptor(test, reference[key], descriptor[key], context=context + key + "->") + elif isinstance(reference, list): + if len(reference) > 0 and isinstance(reference[0], dict): + # Convert to dict and validate + references = {item['name']: item for item in reference} + descriptors = {item['name']: item for item in descriptor} + + self.validate_descriptor(test, references, descriptors, context) + elif reference != descriptor: + raise NMOSTestException(test.FAIL(context + "Unexpected sequence. Expected: " + + str(reference) + + " actual: " + str(descriptor))) + else: + if reference != descriptor: + raise NMOSTestException(test.FAIL(context + 'Expected value: ' + + str(reference) + + ', actual value: ' + + str(descriptor))) + return + + def _get_class_manager_descriptors(self, test, class_manager_oid, property_id): + response = self.get_property_value(test, class_manager_oid, property_id) + + if not response: + return None + + # Create descriptor dictionary from response array + # Use classId as key if present, otherwise use name + def key_lambda(classId, name): return ".".join(map(str, classId)) if classId else name + descriptors = {key_lambda(r.get('classId'), r['name']): r for r in response} + + return descriptors + + def query_device_model(self, test): + """ Query Device Model from the Node under test. + self.device_model_metadata set on Device Model validation error. + NMOSTestException raised if unable to query Device Model """ + self.open_ncp_websocket(test) + if not self.device_model: + self.device_model = self._nc_object_factory( + test, + StandardClassIds.NCBLOCK.value, + self.ROOT_BLOCK_OID, + "root") + + if not self.device_model: + raise NMOSTestException(test.FAIL("Unable to query Device Model")) + return self.device_model + + def get_class_manager(self, test): + """Get the Class Manager queried from the Node under test's Device Model""" + if not self.class_manager: + self.class_manager = self._get_manager(test, StandardClassIds.NCCLASSMANAGER.value) + + return self.class_manager + + def get_device_manager(self, test): + """Get the Device Manager queried from the Node under test's Device Model""" + return self._get_manager(test, StandardClassIds.NCDEVICEMANAGER.value) + + def _get_manager(self, test, class_id): + self.open_ncp_websocket(test) + device_model = self.query_device_model(test) + members = device_model.find_members_by_class_id(class_id, include_derived=True) + + spec_link = "https://specs.amwa.tv/ms-05-02/branches/{}/docs/Managers.html".format(self.spec_branch) + + if len(members) == 0: + raise NMOSTestException(test.FAIL("Manager not found in Root Block.", spec_link)) + + if len(members) > 1: + raise NMOSTestException(test.FAIL("Manager MUST be a singleton.", spec_link)) + + return members[0] + + def _nc_object_factory(self, test, class_id, oid, role): + """Create NcObject or NcBlock based on class_id""" + # will set self.device_model_error to True if problems encountered + try: + runtime_constraints = self.get_property_value( + test, + oid, + NcObjectProperties.RUNTIME_PROPERTY_CONSTRAINTS.value) + + # Check class id to determine if this is a block + if len(class_id) > 1 and class_id[0] == 1 and class_id[1] == 1: + member_descriptors = self.get_property_value( + test, + oid, + NcBlockProperties.MEMBERS.value) + + nc_block = NcBlock(class_id, oid, role, member_descriptors, runtime_constraints) + + for m in member_descriptors: + child_object = self._nc_object_factory(test, m["classId"], m["oid"], m["role"]) + if child_object: + nc_block.add_child_object(child_object) + + return nc_block + else: + # Check to determine if this is a Class Manager + if len(class_id) > 2 and class_id[0] == 1 and class_id[1] == 3 and class_id[2] == 2: + class_descriptors = self._get_class_manager_descriptors( + test, + oid, + NcClassManagerProperties.CONTROL_CLASSES.value) + + datatype_descriptors = self._get_class_manager_descriptors( + test, + oid, + NcClassManagerProperties.DATATYPES.value) + + if not class_descriptors or not datatype_descriptors: + # An error has likely occured + return None + + return NcClassManager(class_id, + oid, + role, + class_descriptors, + datatype_descriptors, + runtime_constraints) + + return NcObject(class_id, oid, role, runtime_constraints) + + except NMOSTestException as e: + raise NMOSTestException(test.FAIL("Error in Device Model " + role + ": " + str(e.args[0].detail))) + + def resolve_datatype(self, test, datatype): + """Resolve datatype to its base type""" + class_manager = self.get_class_manager(test) + if class_manager.datatype_descriptors[datatype].get("parentType"): + return self.resolve_datatype(test, class_manager.datatype_descriptors[datatype].get("parentType")) + return datatype + class NcObject(): def __init__(self, class_id, oid, role, runtime_constraints): diff --git a/nmostesting/suites/IS1201Test.py b/nmostesting/suites/IS1201Test.py index e4187bf6..af39bf10 100644 --- a/nmostesting/suites/IS1201Test.py +++ b/nmostesting/suites/IS1201Test.py @@ -12,25 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import os import time from itertools import product -from jsonschema import ValidationError, SchemaError from ..Config import WS_MESSAGE_TIMEOUT from ..GenericTest import GenericTest, NMOSTestException -from ..IS12Utils import IS12Utils, NcObject, NcMethodStatus, NcBlockProperties, NcPropertyChangeType, \ - NcObjectMethods, NcObjectProperties, NcObjectEvents, NcClassManagerProperties, NcDeviceManagerProperties, \ - StandardClassIds, NcClassManager, NcBlock, NcDatatypeType -from ..TestHelper import load_resolved_schema +from ..IS12Utils import IS12Utils, NcMethodStatus, NcBlockProperties, NcPropertyChangeType, \ + NcObjectMethods, NcObjectProperties, NcObjectEvents, NcDeviceManagerProperties, \ + StandardClassIds, NcBlock, NcDatatypeType from ..TestResult import Test + NODE_API_KEY = "node" CONTROL_API_KEY = "ncp" MS05_API_KEY = "controlframework" -FEATURE_SETS_KEY = "featuresets" class IS1201Test(GenericTest): @@ -39,17 +36,14 @@ def __init__(self, apis, **kwargs): # Remove the RAML key to prevent this test suite from auto-testing IS-04 API apis[NODE_API_KEY].pop("raml", None) GenericTest.__init__(self, apis, **kwargs) - self.node_url = self.apis[NODE_API_KEY]["url"] - self.ncp_url = self.apis[CONTROL_API_KEY]["url"] - self.is12_utils = IS12Utils(self.node_url, - self.apis[CONTROL_API_KEY]["spec_path"], - self.apis[CONTROL_API_KEY]["spec_branch"]) - self.load_reference_resources() - self.device_model = None - self.class_manager = None - self.datatype_schemas = None + self.node_url = apis[NODE_API_KEY]["url"] + self.ncp_url = apis[CONTROL_API_KEY]["url"] + self.is12_utils = IS12Utils(apis) + self.is12_utils.load_reference_resources() def set_up_tests(self): + super().set_up_tests() + self.datatype_schemas = None self.unique_roles_error = False self.unique_oids_error = False self.managers_are_singletons_error = False @@ -83,144 +77,75 @@ def execute_tests(self, test_names): self.result.append(e.args[0]) self.execute_test(test_name) - def load_model_descriptors(self, descriptor_paths): - descriptors = {} - for descriptor_path in descriptor_paths: - for filename in os.listdir(descriptor_path): - name, extension = os.path.splitext(filename) - if extension == ".json": - with open(os.path.join(descriptor_path, filename), 'r') as json_file: - descriptors[name] = json.load(json_file) - - return descriptors - - def generate_json_schemas(self, datatype_descriptors, schema_path): - """Generate datatype schemas from datatype descriptors""" - datatype_schema_names = [] - base_schema_path = os.path.abspath(schema_path) - if not os.path.exists(base_schema_path): - os.makedirs(base_schema_path) - - for name, descriptor in datatype_descriptors.items(): - json_schema = self.is12_utils.descriptor_to_schema(descriptor) - with open(os.path.join(base_schema_path, name + '.json'), 'w') as output_file: - json.dump(json_schema, output_file, indent=4) - datatype_schema_names.append(name) - - # Load resolved MS-05 datatype schemas - datatype_schemas = {} - for name in datatype_schema_names: - datatype_schemas[name] = load_resolved_schema(schema_path, name + '.json', path_prefix=False) - - return datatype_schemas - - def load_reference_resources(self): - """Load datatype and control class decriptors and create datatype JSON schemas""" - # Calculate paths to MS-05 descriptors - # including Feature Sets specified as additional_paths in test definition - spec_paths = [os.path.join(self.apis[FEATURE_SETS_KEY]["spec_path"], path) - for path in self.apis[FEATURE_SETS_KEY]["repo_paths"]] - spec_paths.append(self.apis[MS05_API_KEY]["spec_path"]) - # Root path for primitive datatypes - spec_paths.append('test_data/IS1201') - - datatype_paths = [] - classes_paths = [] - for spec_path in spec_paths: - datatype_path = os.path.abspath(os.path.join(spec_path, 'models/datatypes/')) - if os.path.exists(datatype_path): - datatype_paths.append(datatype_path) - classes_path = os.path.abspath(os.path.join(spec_path, 'models/classes/')) - if os.path.exists(classes_path): - classes_paths.append(classes_path) - - # Load class and datatype descriptors - self.reference_class_descriptors = self.load_model_descriptors(classes_paths) - - # Load MS-05 datatype descriptors - self.reference_datatype_descriptors = self.load_model_descriptors(datatype_paths) - - # Generate MS-05 datatype schemas from MS-05 datatype descriptors - self.reference_datatype_schemas = self.generate_json_schemas( - datatype_descriptors=self.reference_datatype_descriptors, - schema_path=os.path.join(self.apis[CONTROL_API_KEY]["spec_path"], 'APIs/schemas/')) - - def create_ncp_socket(self, test): - """Create a WebSocket client connection to Node under test. Raises NMOSTestException on error""" - self.is12_utils.open_ncp_websocket(test, self.apis[CONTROL_API_KEY]["url"]) - - def validate_descriptor(self, test, reference, descriptor, context=""): - """Validate descriptor against reference descriptor. Raises NMOSTestException on error""" - non_normative_keys = ['description'] - - if isinstance(reference, dict): - reference_keys = set(reference.keys()) - descriptor_keys = set(descriptor.keys()) - - # compare the keys to see if any extra/missing - key_diff = (set(reference_keys) | set(descriptor_keys)) - (set(reference_keys) & set(descriptor_keys)) - if len(key_diff) > 0: - error_description = "Missing keys " if set(key_diff) <= set(reference_keys) else "Additional keys " - raise NMOSTestException(test.FAIL(context + error_description + str(key_diff))) - for key in reference_keys: - if key in non_normative_keys and not isinstance(reference[key], dict): - continue - # Check for class ID - if key == 'classId' and isinstance(reference[key], list): - if reference[key] != descriptor[key]: - raise NMOSTestException(test.FAIL(context + "Unexpected ClassId. Expected: " - + str(reference[key]) - + " actual: " + str(descriptor[key]))) - else: - self.validate_descriptor(test, reference[key], descriptor[key], context=context + key + "->") - elif isinstance(reference, list): - if len(reference) > 0 and isinstance(reference[0], dict): - # Convert to dict and validate - references = {item['name']: item for item in reference} - descriptors = {item['name']: item for item in descriptor} - - self.validate_descriptor(test, references, descriptors, context) - elif reference != descriptor: - raise NMOSTestException(test.FAIL(context + "Unexpected sequence. Expected: " - + str(reference) - + " actual: " + str(descriptor))) - else: - if reference != descriptor: - raise NMOSTestException(test.FAIL(context + 'Expected value: ' - + str(reference) - + ', actual value: ' - + str(descriptor))) - return + def get_datatype_schema(self, test, type_name): + """Get generated JSON schema for datatype specified""" + if not self.datatype_schemas: + self.datatype_schemas = self._generate_device_model_datatype_schemas(test) - def _validate_schema(self, test, payload, schema, context=""): - """Delegates to validate_schema. Raises NMOSTestExceptions on error""" - if not schema: - raise NMOSTestException(test.FAIL(context + "Missing schema. ")) - try: - # Validate the JSON schema is correct - self.validate_schema(payload, schema) - except ValidationError as e: - raise NMOSTestException(test.FAIL(context + "Schema validation error: " + e.message)) - except SchemaError as e: - raise NMOSTestException(test.FAIL(context + "Schema error: " + e.message)) + return self.datatype_schemas.get(type_name) - return + def _generate_device_model_datatype_schemas(self, test): + # Generate datatype schemas based on the datatype decriptors + # queried from the Node under test's Device Model. + # This will include any Non-standard data types + class_manager = self.is12_utils.get_class_manager(test) - def get_class_manager_descriptors(self, test, class_manager_oid, property_id, role): - response = self.get_property_value(test, class_manager_oid, property_id, role) + # Create JSON schemas for the queried datatypes + return self.is12_utils.generate_json_schemas( + datatype_descriptors=class_manager.datatype_descriptors, + schema_path=os.path.join(self.apis[CONTROL_API_KEY]["spec_path"], 'APIs/tmp_schemas/')) - if not response: - return None + def _validate_property_type(self, test, value, data_type, is_nullable, context=""): + """Validate the the property value is correct according to the type. Raises NMOSTestException on error""" + if value is None: + if is_nullable: + return + else: + raise NMOSTestException(test.FAIL(context + "Non-nullable property set to null.")) + + if self.is12_utils.primitive_to_python_type(data_type): + # Special case: if this is a floating point value it + # can be intepreted as an int in the case of whole numbers + # e.g. 0.0 -> 0, 1.0 -> 1 + if self.is12_utils.primitive_to_python_type(data_type) == float and isinstance(value, int): + return + + if not isinstance(value, self.is12_utils.primitive_to_python_type(data_type)): + raise NMOSTestException(test.FAIL(context + str(value) + " is not of type " + str(data_type))) + else: + self.is12_utils.validate_schema(test, value, self.get_datatype_schema(test, data_type), context) + + return - # Create descriptor dictionary from response array - # Use classId as key if present, otherwise use name - def key_lambda(classId, name): return ".".join(map(str, classId)) if classId else name - descriptors = {key_lambda(r.get('classId'), r['name']): r for r in response} + def get_property(self, test, oid, property_id, context): + """Get property from object. Sets self.device_model_metadata on error""" + try: + return self.is12_utils.get_property(test, oid, property_id) + except NMOSTestException as e: + self.device_model_metadata["error"] = True + self.device_model_metadata["error_msg"] += context \ + + "Error getting property: " \ + + str(property_id) + ": " \ + + str(e.args[0].detail) \ + + "; " + return None - return descriptors + def get_property_value(self, test, oid, property_id, context): + """Get value of property from object. Sets self.device_model_metadata on error""" + try: + return self.is12_utils.get_property_value(test, oid, property_id) + except NMOSTestException as e: + self.device_model_metadata["error"] = True + self.device_model_metadata["error_msg"] += context \ + + "Error getting property: " \ + + str(property_id) + ": " \ + + str(e.args[0].detail) \ + + "; " + return None def validate_model_definitions(self, descriptors, schema_name, reference_descriptors): - """Validate class manager model definitions against reference model descriptors. Returns [test result array]""" + """ Validate Class Manager model definitions against reference model descriptors. + Returns [test result array] """ results = list() reference_descriptor_keys = sorted(reference_descriptors.keys()) @@ -231,11 +156,11 @@ def validate_model_definitions(self, descriptors, schema_name, reference_descrip if descriptors.get(key): descriptor = descriptors[key] - # Validate the JSON schema is correct - self._validate_schema(test, descriptor, self.reference_datatype_schemas[schema_name]) + # Validate descriptor obeys the JSON schema + self.is12_utils.validate_reference_datatype_schema(test, descriptor, schema_name) # Validate the descriptor is correct - self.validate_descriptor(test, reference_descriptors[key], descriptor) + self.is12_utils.validate_descriptor(test, reference_descriptors[key], descriptor) results.append(test.PASS()) else: @@ -245,40 +170,6 @@ def validate_model_definitions(self, descriptors, schema_name, reference_descrip return results - def query_device_model(self, test): - self.create_ncp_socket(test) - if not self.device_model: - self.device_model = self.nc_object_factory(test, - StandardClassIds.NCBLOCK.value, - self.is12_utils.ROOT_BLOCK_OID, - "root") - if not self.device_model: - raise NMOSTestException(test.FAIL("Unable to query Device Model: " - + self.device_model_metadata["error_msg"])) - return self.device_model - - def get_manager(self, test, class_id): - self.create_ncp_socket(test) - device_model = self.query_device_model(test) - members = device_model.find_members_by_class_id(class_id, include_derived=True) - - spec_link = "https://specs.amwa.tv/ms-05-02/branches/{}/docs/Managers.html"\ - .format(self.apis[CONTROL_API_KEY]["spec_branch"]) - - if len(members) == 0: - raise NMOSTestException(test.FAIL("Manager not found in Root Block.", spec_link)) - - if len(members) > 1: - raise NMOSTestException(test.FAIL("Manager MUST be a singleton.", spec_link)) - - return members[0] - - def get_class_manager(self, test): - if not self.class_manager: - self.class_manager = self.get_manager(test, StandardClassIds.NCCLASSMANAGER.value) - - return self.class_manager - def auto_tests(self): """Automatically validate all standard datatypes and control classes. Returns [test result array]""" # https://specs.amwa.tv/ms-05-02/releases/v1.0.0/docs/Framework.html @@ -286,17 +177,17 @@ def auto_tests(self): results = list() test = Test("Initialize auto tests", "auto_init") - self.create_ncp_socket(test) + self.is12_utils.open_ncp_websocket(test) - class_manager = self.get_class_manager(test) + class_manager = self.is12_utils.get_class_manager(test) results += self.validate_model_definitions(class_manager.class_descriptors, 'NcClassDescriptor', - self.reference_class_descriptors) + self.is12_utils.reference_class_descriptors) results += self.validate_model_definitions(class_manager.datatype_descriptors, 'NcDatatypeDescriptor', - self.reference_datatype_descriptors) + self.is12_utils.reference_datatype_descriptors) return results def test_01(self, test): @@ -316,7 +207,7 @@ def test_02(self, test): """WebSocket: endpoint successfully opened""" # https://specs.amwa.tv/is-12/releases/v1.0.0/docs/Transport_and_message_encoding.html - self.create_ncp_socket(test) + self.is12_utils.open_ncp_websocket(test) return test.PASS() @@ -324,7 +215,7 @@ def test_03(self, test): """WebSocket: socket is kept open until client closes""" # https://specs.amwa.tv/is-12/releases/v1.0.0/docs/Protocol_messaging.html#control-session - self.create_ncp_socket(test) + self.is12_utils.open_ncp_websocket(test) # Ensure WebSocket remains open start_time = time.time() @@ -339,11 +230,12 @@ def test_04(self, test): """Device Model: Root Block exists with correct oid and role""" # https://specs.amwa.tv/ms-05-02/releases/v1.0.0/docs/Blocks.html - self.create_ncp_socket(test) + self.is12_utils.open_ncp_websocket(test) - role = self.is12_utils.get_property_value(test, - self.is12_utils.ROOT_BLOCK_OID, - NcObjectProperties.ROLE.value) + role = self.is12_utils.get_property_value( + test, + self.is12_utils.ROOT_BLOCK_OID, + NcObjectProperties.ROLE.value) if role != "root": return test.FAIL("Unexpected role in Root Block: " + str(role), @@ -353,27 +245,6 @@ def test_04(self, test): return test.PASS() - def validate_property_type(self, test, value, data_type, is_nullable, context=""): - if value is None: - if is_nullable: - return - else: - raise NMOSTestException(test.FAIL(context + "Non-nullable property set to null.")) - - if self.is12_utils.primitive_to_python_type(data_type): - # Special case: if this is a floating point value it - # can be intepreted as an int in the case of whole numbers - # e.g. 0.0 -> 0, 1.0 -> 1 - if self.is12_utils.primitive_to_python_type(data_type) == float and isinstance(value, int): - return - - if not isinstance(value, self.is12_utils.primitive_to_python_type(data_type)): - raise NMOSTestException(test.FAIL(context + str(value) + " is not of type " + str(data_type))) - else: - self._validate_schema(test, value, self.datatype_schemas.get(data_type), context) - - return - def check_get_sequence_item(self, test, oid, sequence_values, property_metadata, context=""): if sequence_values is None and not property_metadata["isNullable"]: self.get_sequence_item_metadata["error"] = True @@ -428,30 +299,6 @@ def check_sequence_methods(self, test, oid, sequence_values, property_metadata, self.check_get_sequence_item(test, oid, sequence_values, property_metadata, context) self.check_get_sequence_length(test, oid, sequence_values, property_metadata, context) - def get_property_value(self, test, oid, property_id, context): - try: - return self.is12_utils.get_property_value(test, oid, property_id) - except NMOSTestException as e: - self.device_model_metadata["error"] = True - self.device_model_metadata["error_msg"] += context \ - + "Error getting property: " \ - + str(property_id) + ": " \ - + str(e.args[0].detail) \ - + "; " - return None - - def get_property(self, test, oid, property_id, context): - try: - return self.is12_utils.get_property(test, oid, property_id) - except NMOSTestException as e: - self.device_model_metadata["error"] = True - self.device_model_metadata["error_msg"] += context \ - + "Error getting property: " \ - + str(property_id) + ": " \ - + str(e.args[0].detail) \ - + "; " - return None - def check_object_properties(self, test, reference_class_descriptor, oid, context): for class_property in reference_class_descriptor['properties']: response = self.get_property(test, oid, class_property.get('id'), context) @@ -479,24 +326,26 @@ def check_object_properties(self, test, reference_class_descriptor, oid, context # validate property type if class_property['isSequence']: for property_value in object_property: - self.validate_property_type(test, - property_value, - class_property['typeName'], - class_property['isNullable'], - context=context + class_property["typeName"] - + ": " + class_property["name"] + ": ") + self._validate_property_type( + test, + property_value, + class_property['typeName'], + class_property['isNullable'], + context=context + class_property["typeName"] + + ": " + class_property["name"] + ": ") self.check_sequence_methods(test, oid, object_property, class_property, context=context) else: - self.validate_property_type(test, - object_property, - class_property['typeName'], - class_property['isNullable'], - context=context + class_property["typeName"] - + class_property["name"] + ": ") + self._validate_property_type( + test, + object_property, + class_property['typeName'], + class_property['isNullable'], + context=context + class_property["typeName"] + + class_property["name"] + ": ") return def check_unique_roles(self, role, role_cache): @@ -530,21 +379,25 @@ def check_manager(self, class_id, owner, class_descriptors, manager_cache): def check_touchpoints(self, test, oid, context): """Touchpoint checks""" - touchpoints = self.get_property_value(test, - oid, - NcObjectProperties.TOUCHPOINTS.value, - context) + touchpoints = self.get_property_value( + test, + oid, + NcObjectProperties.TOUCHPOINTS.value, + context) + if touchpoints is not None: self.touchpoints_metadata["checked"] = True try: for touchpoint in touchpoints: - schema = self.datatype_schemas.get("NcTouchpointNmos") \ + schema = self.get_datatype_schema(test, "NcTouchpointNmos") \ if touchpoint["contextNamespace"] == "x-nmos" \ - else self.datatype_schemas.get("NcTouchpointNmosChannelMapping") - self._validate_schema(test, - touchpoint, - schema, - context=context + schema["title"] + ": ") + else self.get_datatype_schema(test, "NcTouchpointNmosChannelMapping") + self.is12_utils.validate_schema( + test, + touchpoint, + schema, + context=context + schema["title"] + ": ") + except NMOSTestException as e: self.touchpoints_metadata["error"] = True self.touchpoints_metadata["error_msg"] = context + str(e.args[0].detail) @@ -560,10 +413,11 @@ def check_block(self, test, block, class_descriptors, context=""): role_cache = [] manager_cache = [] for descriptor in block.member_descriptors: - self._validate_schema(test, - descriptor, - self.datatype_schemas.get("NcBlockMemberDescriptor"), - context="NcBlockMemberDescriptor: ") + self.is12_utils.validate_schema( + test, + descriptor, + self.get_datatype_schema(test, "NcBlockMemberDescriptor"), + context="NcBlockMemberDescriptor: ") self.check_unique_roles(descriptor['role'], role_cache) self.check_unique_oid(descriptor['oid']) @@ -585,7 +439,7 @@ def check_block(self, test, block, class_descriptors, context=""): + "Class not advertised by Class Manager: " \ + str(descriptor['classId']) + ". " - if class_identifier not in self.reference_class_descriptors and \ + if class_identifier not in self.is12_utils.reference_class_descriptors and \ not self.is12_utils.is_non_standard_class(descriptor['classId']): # Not a standard or non-standard class self.organization_metadata["error"] = True @@ -593,27 +447,11 @@ def check_block(self, test, block, class_descriptors, context=""): + "Non-standard class id does not contain authority key: " \ + str(descriptor['classId']) + ". " - def generate_device_model_datatype_schemas(self, test): - # Generate datatype schemas based on the datatype decriptors - # queried from the Node under test's Device Model. - # This will include any Non-standard data types - if self.datatype_schemas: - return - - class_manager = self.get_class_manager(test) - - # Create JSON schemas for the queried datatypes - self.datatype_schemas = self.generate_json_schemas( - datatype_descriptors=class_manager.datatype_descriptors, - schema_path=os.path.join(self.apis[CONTROL_API_KEY]["spec_path"], 'APIs/tmp_schemas/')) - def check_device_model(self, test): if not self.device_model_metadata["checked"]: - self.create_ncp_socket(test) - class_manager = self.get_class_manager(test) - device_model = self.query_device_model(test) - - self.generate_device_model_datatype_schemas(test) + self.is12_utils.open_ncp_websocket(test) + class_manager = self.is12_utils.get_class_manager(test) + device_model = self.is12_utils.query_device_model(test) self.check_block(test, device_model, @@ -622,42 +460,6 @@ def check_device_model(self, test): self.device_model_metadata["checked"] = True return - def nc_object_factory(self, test, class_id, oid, role): - """Create NcObject or NcBlock based on class_id""" - runtime_constraints = self.get_property_value(test, - oid, - NcObjectProperties.RUNTIME_PROPERTY_CONSTRAINTS.value, - role + ": ") - - # Check class id to determine if this is a block - if len(class_id) > 1 and class_id[0] == 1 and class_id[1] == 1: - member_descriptors = self.get_property_value(test, oid, NcBlockProperties.MEMBERS.value, role + ": ") - - nc_block = NcBlock(class_id, oid, role, member_descriptors, runtime_constraints) - - for m in member_descriptors: - child_object = self.nc_object_factory(test, m["classId"], m["oid"], m["role"]) - if child_object: - nc_block.add_child_object(child_object) - return nc_block - else: - # Check to determine if this is a Class Manager - if len(class_id) > 2 and class_id[0] == 1 and class_id[1] == 3 and class_id[2] == 2: - class_descriptors = self.get_class_manager_descriptors(test, - oid, - NcClassManagerProperties.CONTROL_CLASSES.value, - role + ": ") - datatype_descriptors = self.get_class_manager_descriptors(test, - oid, - NcClassManagerProperties.DATATYPES.value, - role + ": ") - if not class_descriptors or not datatype_descriptors: - # An error has likely occured - return None - - return NcClassManager(class_id, oid, role, class_descriptors, datatype_descriptors, runtime_constraints) - return NcObject(class_id, oid, role, runtime_constraints) - def test_05(self, test): """Device Model: Device Model is correct according to classes and datatypes advertised by Class Manager""" # https://specs.amwa.tv/ms-05-02/releases/v1.0.0/docs/Managers.html @@ -844,10 +646,10 @@ def test_13(self, test): spec_link = "https://specs.amwa.tv/ms-05-02/branches/{}/docs/Managers.html"\ .format(self.apis[CONTROL_API_KEY]["spec_branch"]) - class_manager = self.get_class_manager(test) + class_manager = self.is12_utils.get_class_manager(test) class_id_str = ".".join(map(str, StandardClassIds.NCCLASSMANAGER.value)) - class_descriptor = self.reference_class_descriptors[class_id_str] + class_descriptor = self.is12_utils.reference_class_descriptors[class_id_str] if class_manager.role != class_descriptor["fixedRole"]: return test.FAIL("Class Manager MUST have a role of ClassManager.", spec_link) @@ -862,10 +664,10 @@ def test_14(self, test): spec_link = "https://specs.amwa.tv/ms-05-02/branches/{}/docs/Managers.html"\ .format(self.apis[CONTROL_API_KEY]["spec_branch"]) - device_manager = self.get_manager(test, StandardClassIds.NCDEVICEMANAGER.value) + device_manager = self.is12_utils.get_device_manager(test) class_id_str = ".".join(map(str, StandardClassIds.NCDEVICEMANAGER.value)) - class_descriptor = self.reference_class_descriptors[class_id_str] + class_descriptor = self.is12_utils.reference_class_descriptors[class_id_str] if device_manager.role != class_descriptor["fixedRole"]: return test.FAIL("Device Manager MUST have a role of DeviceManager.", spec_link) @@ -887,7 +689,7 @@ def test_15(self, test): # specification it MUST comply with the model definitions published # https://specs.amwa.tv/ms-05-02/releases/v1.0.0/docs/Framework.html#ncclassmanager - class_manager = self.get_class_manager(test) + class_manager = self.is12_utils.get_class_manager(test) for _, class_descriptor in class_manager.class_descriptors.items(): for include_inherited in [False, True]: @@ -897,10 +699,11 @@ def test_15(self, test): include_inherited) expected_descriptor = class_manager.get_control_class(class_descriptor["classId"], include_inherited) - self.validate_descriptor(test, - expected_descriptor, - actual_descriptor, - context=str(class_descriptor["classId"]) + ": ") + self.is12_utils.validate_descriptor( + test, + expected_descriptor, + actual_descriptor, + context=str(class_descriptor["classId"]) + ": ") return test.PASS() @@ -910,7 +713,7 @@ def test_16(self, test): # specification it MUST comply with the model definitions published # https://specs.amwa.tv/ms-05-02/releases/v1.0.0/docs/Framework.html#ncclassmanager - class_manager = self.get_class_manager(test) + class_manager = self.is12_utils.get_class_manager(test) for _, datatype_descriptor in class_manager.datatype_descriptors.items(): for include_inherited in [False, True]: @@ -920,10 +723,11 @@ def test_16(self, test): include_inherited) expected_descriptor = class_manager.get_datatype(datatype_descriptor["name"], include_inherited) - self.validate_descriptor(test, - expected_descriptor, - actual_descriptor, - context=datatype_descriptor["name"] + ": ") + self.is12_utils.validate_descriptor( + test, + expected_descriptor, + actual_descriptor, + context=datatype_descriptor["name"] + ": ") return test.PASS() @@ -938,7 +742,7 @@ def test_17(self, test): .format(self.apis[MS05_API_KEY]["spec_branch"]) # Attempt to set labels - self.create_ncp_socket(test) + self.is12_utils.open_ncp_websocket(test) property_id = NcObjectProperties.USER_LABEL.value @@ -1038,12 +842,13 @@ def do_get_member_descriptors_test(self, test, block, context=""): expected_members_oids = [m["oid"] for m in expected_members] for queried_member in queried_members: - self._validate_schema(test, - queried_member, - self.reference_datatype_schemas["NcBlockMemberDescriptor"], - context=context - + block.role - + ": NcBlockMemberDescriptor: ") + self.is12_utils.validate_reference_datatype_schema( + test, + queried_member, + "NcBlockMemberDescriptor", + context=context + + block.role + + ": NcBlockMemberDescriptor: ") if queried_member["oid"] not in expected_members_oids: raise NMOSTestException(test.FAIL(context @@ -1056,7 +861,7 @@ def test_20(self, test): # specification it MUST comply with the model definitions published # https://specs.amwa.tv/ms-05-02/releases/v1.0.0/docs/Framework.html#ncblock - device_model = self.query_device_model(test) + device_model = self.is12_utils.query_device_model(test) self.do_get_member_descriptors_test(test, device_model) @@ -1084,12 +889,13 @@ def do_find_member_by_path_test(self, test, block, context=""): + str(role_path))) for queried_member in queried_members: - self._validate_schema(test, - queried_member, - self.reference_datatype_schemas["NcBlockMemberDescriptor"], - context=context - + block.role - + ": NcBlockMemberDescriptor: ") + self.is12_utils.validate_reference_datatype_schema( + test, + queried_member, + "NcBlockMemberDescriptor", + context=context + + block.role + + ": NcBlockMemberDescriptor: ") if len(queried_members) != 1: raise NMOSTestException(test.FAIL(context @@ -1110,7 +916,7 @@ def test_21(self, test): # specification it MUST comply with the model definitions published # https://specs.amwa.tv/ms-05-02/releases/v1.0.0/docs/Framework.html#ncblock - device_model = self.query_device_model(test) + device_model = self.is12_utils.query_device_model(test) # Recursively check each block in Device Model self.do_find_member_by_path_test(test, device_model) @@ -1184,7 +990,7 @@ def test_22(self, test): # specification it MUST comply with the model definitions published # https://specs.amwa.tv/ms-05-02/releases/v1.0.0/docs/Framework.html#ncblock - device_model = self.query_device_model(test) + device_model = self.is12_utils.query_device_model(test) # Recursively check each block in Device Model self.do_find_member_by_role_test(test, device_model) @@ -1245,7 +1051,7 @@ def test_23(self, test): # specification it MUST comply with the model definitions published # https://specs.amwa.tv/ms-05-02/releases/v1.0.0/docs/Framework.html#ncblock - device_model = self.query_device_model(test) + device_model = self.is12_utils.query_device_model(test) self.do_find_members_by_class_id_test(test, device_model) @@ -1257,7 +1063,7 @@ def do_error_test(self, test, command_json, expected_status=None): # check the syntax of the error message according to is12_error try: - self.create_ncp_socket(test) + self.is12_utils.open_ncp_websocket(test) self.is12_utils.send_command(test, command_json) @@ -1361,7 +1167,7 @@ def test_28(self, test): # MS-05-02 (15) Devices MUST use the exact status code from NcMethodStatus when errors are encountered # for the following scenarios... - device_model = self.query_device_model(test) + device_model = self.is12_utils.query_device_model(test) # Calculate invalid oid from the max oid value in device model oids = device_model.get_oids() invalid_oid = max(oids) + 1 @@ -1432,7 +1238,7 @@ def test_32(self, test): # for the following scenarios... # https://specs.amwa.tv/ms-05-02/releases/v1.0.0/docs/Framework.html#ncmethodresult - self.create_ncp_socket(test) + self.is12_utils.open_ncp_websocket(test) length = self.is12_utils.get_sequence_length(test, self.is12_utils.ROOT_BLOCK_OID, @@ -1455,7 +1261,7 @@ def test_33(self, test): # https://specs.amwa.tv/is-12/releases/v1.0.0/docs/Protocol_messaging.html#subscription-message-type # https://specs.amwa.tv/is-12/releases/v1.0.0/docs/Protocol_messaging.html#subscription-response-message-type - device_model = self.query_device_model(test) + device_model = self.is12_utils.query_device_model(test) # Get all oids for objects in this Device Model device_model_objects = device_model.find_members_by_class_id(class_id=StandardClassIds.NCOBJECT.value, @@ -1475,7 +1281,9 @@ def test_33(self, test): context = "oid: " + str(oid) + ", " + # Each label will be set twice; once to the new user label, and then again back to the old user label for label in [new_user_label, old_user_label]: + # Set property and log notificaiton self.is12_utils.start_logging_notifications() self.is12_utils.set_property(test, oid, NcObjectProperties.USER_LABEL.value, label) self.is12_utils.stop_logging_notifications() @@ -1507,6 +1315,7 @@ def test_33(self, test): oids[oid] += 1 + # We expect each object to have 2 notifications (set to new user label, set to old user label) if not all(v == 2 for v in oids.values()): error = True error_message += "Notifications not received for Oids " \ @@ -1519,15 +1328,9 @@ def test_33(self, test): return test.FAIL(error_message) return test.PASS() - def resolve_datatype(self, test, datatype): - class_manager = self.get_class_manager(test) - if class_manager.datatype_descriptors[datatype].get("parentType"): - return self.resolve_datatype(test, class_manager.datatype_descriptors[datatype].get("parentType")) - return datatype - def check_constraint(self, test, constraint, type_name, is_sequence, test_metadata, context): if constraint.get("defaultValue"): - datatype_schema = self.datatype_schemas.get(type_name) + datatype_schema = self.get_datatype_schema(test, type_name) if isinstance(constraint.get("defaultValue"), list) is not is_sequence: test_metadata["error"] = True test_metadata["error_msg"] = context + (" a default value sequence was expected" @@ -1535,14 +1338,15 @@ def check_constraint(self, test, constraint, type_name, is_sequence, test_metada return if is_sequence: for value in constraint.get("defaultValue"): - self._validate_schema(test, value, datatype_schema, context + ": defaultValue ") + self.is12_utils.validate_schema(test, value, datatype_schema, context + ": defaultValue ") else: - self._validate_schema(test, - constraint.get("defaultValue"), - datatype_schema, - context + ": defaultValue ") + self.is12_utils.validate_schema( + test, + constraint.get("defaultValue"), + datatype_schema, + context + ": defaultValue ") - datatype = self.resolve_datatype(test, type_name) + datatype = self.is12_utils.resolve_datatype(test, type_name) # check NcXXXConstraintsNumber if constraint.get("minimum") or constraint.get("maximum") or constraint.get("step"): constraint_type = "NcPropertyConstraintsNumber" \ @@ -1588,10 +1392,8 @@ def do_validate_runtime_constraints_test(self, test, nc_object, class_manager, c def test_34(self, test): """Constraints: validate runtime constraints""" - device_model = self.query_device_model(test) - class_manager = self.get_class_manager(test) - - self.generate_device_model_datatype_schemas(test) + device_model = self.is12_utils.query_device_model(test) + class_manager = self.is12_utils.get_class_manager(test) self.do_validate_runtime_constraints_test(test, device_model, class_manager) @@ -1628,10 +1430,8 @@ def do_validate_property_constraints_test(self, test, nc_object, class_manager, def test_35(self, test): """Constraints: validate property constraints""" - device_model = self.query_device_model(test) - class_manager = self.get_class_manager(test) - - self.generate_device_model_datatype_schemas(test) + device_model = self.is12_utils.query_device_model(test) + class_manager = self.is12_utils.get_class_manager(test) self.do_validate_property_constraints_test(test, device_model, class_manager) @@ -1662,9 +1462,7 @@ def do_validate_datatype_constraints_test(self, test, datatype, type_name, conte def test_36(self, test): """Constraints: validate datatype constraints""" - class_manager = self.get_class_manager(test) - - self.generate_device_model_datatype_schemas(test) + class_manager = self.is12_utils.get_class_manager(test) for _, datatype in class_manager.datatype_descriptors.items(): self.do_validate_datatype_constraints_test(test, datatype, datatype["name"]) diff --git a/nmostesting/suites/IS1202Test.py b/nmostesting/suites/IS1202Test.py index 6d4bbe6b..7cd95523 100644 --- a/nmostesting/suites/IS1202Test.py +++ b/nmostesting/suites/IS1202Test.py @@ -12,23 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json -import os import re import sys -from jsonschema import ValidationError, SchemaError from math import floor from xeger import Xeger from ..Config import IS12_INTERACTIVE_TESTING from ..GenericTest import NMOSTestException from ..ControllerTest import ControllerTest, TestingFacadeException -from ..IS12Utils import IS12Utils, NcDatatypeType, NcObject, NcBlockProperties, \ - NcObjectProperties, NcClassManagerProperties, \ - StandardClassIds, NcClassManager, NcBlock -from ..TestHelper import load_resolved_schema -from ..TestResult import Test +from ..IS12Utils import IS12Utils, NcDatatypeType, \ + NcObjectProperties, NcBlock NODE_API_KEY = "node" CONTROL_API_KEY = "ncp" @@ -42,10 +36,8 @@ def __init__(self, apis, **kwargs): ControllerTest.__init__(self, apis, **kwargs) self.node_url = self.apis[NODE_API_KEY]["url"] self.ncp_url = self.apis[CONTROL_API_KEY]["url"] - self.is12_utils = IS12Utils(self.node_url, - self.apis[CONTROL_API_KEY]["spec_path"], - self.apis[CONTROL_API_KEY]["spec_branch"]) - self.load_reference_resources() + self.is12_utils = IS12Utils(apis) + self.is12_utils.load_reference_resources() self.device_model = None self.constraint_error = False self.constraint_error_msg = "" @@ -116,162 +108,9 @@ def post_tests_message(self): # post_test_introducton timed out pass - def load_model_descriptors(self, descriptor_paths): - descriptors = {} - for descriptor_path in descriptor_paths: - for filename in os.listdir(descriptor_path): - name, extension = os.path.splitext(filename) - if extension == ".json": - with open(os.path.join(descriptor_path, filename), 'r') as json_file: - descriptors[name] = json.load(json_file) - - return descriptors - - def generate_json_schemas(self, datatype_descriptors, schema_path): - """Generate datatype schemas from datatype descriptors""" - datatype_schema_names = [] - base_schema_path = os.path.abspath(schema_path) - if not os.path.exists(base_schema_path): - os.makedirs(base_schema_path) - - for name, descriptor in datatype_descriptors.items(): - json_schema = self.is12_utils.descriptor_to_schema(descriptor) - with open(os.path.join(base_schema_path, name + '.json'), 'w') as output_file: - json.dump(json_schema, output_file, indent=4) - datatype_schema_names.append(name) - - # Load resolved MS-05 datatype schemas - datatype_schemas = {} - for name in datatype_schema_names: - datatype_schemas[name] = load_resolved_schema(schema_path, name + '.json', path_prefix=False) - - return datatype_schemas - - def load_reference_resources(self): - """Load datatype and control class decriptors and create datatype JSON schemas""" - # Calculate paths to MS-05 descriptors - # including Feature Sets specified as additional_paths in test definition - spec_paths = [os.path.join(self.apis[FEATURE_SETS_KEY]["spec_path"], path) - for path in self.apis[FEATURE_SETS_KEY]["repo_paths"]] - spec_paths.append(self.apis[MS05_API_KEY]["spec_path"]) - # Root path for primitive datatypes - spec_paths.append('test_data/IS1201') - - datatype_paths = [] - classes_paths = [] - for spec_path in spec_paths: - datatype_path = os.path.abspath(os.path.join(spec_path, 'models/datatypes/')) - if os.path.exists(datatype_path): - datatype_paths.append(datatype_path) - classes_path = os.path.abspath(os.path.join(spec_path, 'models/classes/')) - if os.path.exists(classes_path): - classes_paths.append(classes_path) - - # Load class and datatype descriptors - self.reference_class_descriptors = self.load_model_descriptors(classes_paths) - - # Load MS-05 datatype descriptors - self.reference_datatype_descriptors = self.load_model_descriptors(datatype_paths) - - # Generate MS-05 datatype schemas from MS-05 datatype descriptors - self.datatype_schemas = self.generate_json_schemas( - datatype_descriptors=self.reference_datatype_descriptors, - schema_path=os.path.join(self.apis[CONTROL_API_KEY]["spec_path"], 'APIs/schemas/')) - def create_ncp_socket(self, test): """Create a WebSocket client connection to Node under test. Raises NMOSTestException on error""" - self.is12_utils.open_ncp_websocket(test, self.apis[CONTROL_API_KEY]["url"]) - - def validate_descriptor(self, test, reference, descriptor, context=""): - """Validate descriptor against reference descriptor. Raises NMOSTestException on error""" - non_normative_keys = ['description'] - - if isinstance(reference, dict): - reference_keys = set(reference.keys()) - descriptor_keys = set(descriptor.keys()) - - # compare the keys to see if any extra/missing - key_diff = (set(reference_keys) | set(descriptor_keys)) - (set(reference_keys) & set(descriptor_keys)) - if len(key_diff) > 0: - error_description = "Missing keys " if set(key_diff) <= set(reference_keys) else "Additional keys " - raise NMOSTestException(test.FAIL(context + error_description + str(key_diff))) - for key in reference_keys: - if key in non_normative_keys and not isinstance(reference[key], dict): - continue - # Check for class ID - if key == 'classId' and isinstance(reference[key], list): - if reference[key] != descriptor[key]: - raise NMOSTestException(test.FAIL(context + "Unexpected ClassId. Expected: " - + str(reference[key]) - + " actual: " + str(descriptor[key]))) - else: - self.validate_descriptor(test, reference[key], descriptor[key], context=context + key + "->") - elif isinstance(reference, list): - # Convert to dict and validate - references = {item['name']: item for item in reference} - descriptors = {item['name']: item for item in descriptor} - - return self.validate_descriptor(test, references, descriptors, context) - else: - if reference != descriptor: - raise NMOSTestException(test.FAIL(context + 'Expected value: ' - + str(reference) - + ', actual value: ' - + str(descriptor))) - return - - def _validate_schema(self, test, payload, schema, context=""): - """Delegates to validate_schema. Raises NMOSTestExceptions on error""" - if not schema: - raise NMOSTestException(test.FAIL(context + "Missing schema. ")) - try: - # Validate the JSON schema is correct - self.validate_schema(payload, schema) - except ValidationError as e: - raise NMOSTestException(test.FAIL(context + "Schema validation error: " + e.message)) - except SchemaError as e: - raise NMOSTestException(test.FAIL(context + "Schema error: " + e.message)) - - return - - def get_class_manager_descriptors(self, test, class_manager_oid, property_id, role): - response = self.get_property_value(test, class_manager_oid, property_id, role) - - if not response: - return None - - # Create descriptor dictionary from response array - # Use classId as key if present, otherwise use name - def key_lambda(classId, name): return ".".join(map(str, classId)) if classId else name - descriptors = {key_lambda(r.get('classId'), r['name']): r for r in response} - - return descriptors - - def validate_model_definitions(self, descriptors, schema_name, reference_descriptors): - """Validate class manager model definitions against reference model descriptors. Returns [test result array]""" - results = list() - - reference_descriptor_keys = sorted(reference_descriptors.keys()) - - for key in reference_descriptor_keys: - test = Test("Validate " + str(key) + " definition", "auto_" + str(key)) - try: - if descriptors.get(key): - descriptor = descriptors[key] - - # Validate the JSON schema is correct - self._validate_schema(test, descriptor, self.datatype_schemas[schema_name]) - - # Validate the descriptor is correct - self.validate_descriptor(test, reference_descriptors[key], descriptor) - - results.append(test.PASS()) - else: - results.append(test.UNCLEAR("Not Implemented")) - except NMOSTestException as e: - results.append(e.args[0]) - - return results + self.is12_utils.open_ncp_websocket(test) def get_property_value(self, test, oid, property_id, context): try: @@ -285,73 +124,6 @@ def get_property_value(self, test, oid, property_id, context): + "; " return None - def nc_object_factory(self, test, class_id, oid, role): - """Create NcObject or NcBlock based on class_id""" - runtime_constraints = self.get_property_value(test, - oid, - NcObjectProperties.RUNTIME_PROPERTY_CONSTRAINTS.value, - role + ": ") - - # Check class id to determine if this is a block - if len(class_id) > 1 and class_id[0] == 1 and class_id[1] == 1: - member_descriptors = self.get_property_value(test, oid, NcBlockProperties.MEMBERS.value, role + ": ") - if not member_descriptors: - # An error has likely occured - return None - - nc_block = NcBlock(class_id, oid, role, member_descriptors, runtime_constraints) - - for m in member_descriptors: - child_object = self.nc_object_factory(test, m["classId"], m["oid"], m["role"]) - if child_object: - nc_block.add_child_object(child_object) - return nc_block - else: - # Check to determine if this is a Class Manager - if len(class_id) > 2 and class_id[0] == 1 and class_id[1] == 3 and class_id[2] == 2: - class_descriptors = self.get_class_manager_descriptors(test, - oid, - NcClassManagerProperties.CONTROL_CLASSES.value, - role + ": ") - datatype_descriptors = self.get_class_manager_descriptors(test, - oid, - NcClassManagerProperties.DATATYPES.value, - role + ": ") - if not class_descriptors or not datatype_descriptors: - # An error has likely occured - return None - - return NcClassManager(class_id, oid, role, class_descriptors, datatype_descriptors, runtime_constraints) - return NcObject(class_id, oid, role, runtime_constraints) - - def query_device_model(self, test): - self.create_ncp_socket(test) - if not self.device_model: - self.device_model = self.nc_object_factory(test, - StandardClassIds.NCBLOCK.value, - self.is12_utils.ROOT_BLOCK_OID, - "root") - if not self.device_model: - raise NMOSTestException(test.FAIL("Unable to query Device Model: " - + self.device_model_metadata["error_msg"])) - return self.device_model - - def get_manager(self, test, class_id): - self.create_ncp_socket(test) - device_model = self.query_device_model(test) - members = device_model.find_members_by_class_id(class_id, include_derived=True) - - spec_link = "https://specs.amwa.tv/ms-05-02/branches/{}/docs/Managers.html"\ - .format(self.apis[CONTROL_API_KEY]["spec_branch"]) - - if len(members) == 0: - raise NMOSTestException(test.FAIL("Manager not found in Root Block.", spec_link)) - - if len(members) > 1: - raise NMOSTestException(test.FAIL("Manager MUST be a singleton.", spec_link)) - - return members[0] - def _get_constraints(self, test, class_property, datatype_descriptors, object_runtime_constraints): datatype_constraints = None runtime_constraints = None @@ -374,7 +146,7 @@ def _get_properties(self, test, block, get_constraints=True, get_sequences=False results = [] context += block.role - class_manager = self.get_manager(test, StandardClassIds.NCCLASSMANAGER.value) + class_manager = self.is12_utils.get_class_manager(test) block_member_descriptors = self.is12_utils.get_member_descriptors(test, block.oid, recurse=False) @@ -418,7 +190,7 @@ def _get_methods(self, test, block, context=""): results = [] context += block.role - class_manager = self.get_manager(test, StandardClassIds.NCCLASSMANAGER.value) + class_manager = self.is12_utils.get_class_manager(test) block_member_descriptors = self.is12_utils.get_member_descriptors(test, block.oid, recurse=False) @@ -601,7 +373,7 @@ def _check_parameter_constraints_string(self, test, constrained_property): def _do_check_property_test(self, test, question, get_constraints=False, get_sequences=False): """Test properties within the Device Model""" - device_model = self.query_device_model(test) + device_model = self.is12_utils.query_device_model(test) constrained_properties = self._get_properties(test, device_model, get_constraints=True, get_sequences=False) @@ -648,25 +420,21 @@ def _do_check_property_test(self, test, question, get_constraints=False, get_seq constrained_property['oid'], constrained_property['property_id'], original_value) - except NMOSTestException: - return test.FAIL(constrained_property.get("name") + ": error setting property") + except NMOSTestException as e: + return test.FAIL(constrained_property.get("name") + + ": error setting property: " + + str(e.args[0].detail)) if self.constraint_error: return test.FAIL(self.constraint_error_msg) return test.PASS() - def _resolve_datatype(self, test, datatype): - class_manager = self.get_manager(test, StandardClassIds.NCCLASSMANAGER.value) - if class_manager.datatype_descriptors[datatype].get("parentType"): - return self._resolve_datatype(test, class_manager.datatype_descriptors[datatype].get("parentType")) - return datatype - def _resolve_is_sequence(self, test, datatype): if datatype is None: return False - class_manager = self.get_manager(test, StandardClassIds.NCCLASSMANAGER.value) + class_manager = self.is12_utils.get_class_manager(test) parentType = class_manager.datatype_descriptors[datatype].get("parentType") @@ -723,9 +491,9 @@ def _create_compatible_parameter(self, test, parameter_descriptor): return self._generate_string_parameter(constraints) else: # resolve the datatype to either a struct, enum or primative - datatype = self._resolve_datatype(test, parameter_descriptor['typeName']) + datatype = self.is12_utils.resolve_datatype(test, parameter_descriptor['typeName']) - class_manager = self.get_manager(test, StandardClassIds.NCCLASSMANAGER.value) + class_manager = self.is12_utils.get_class_manager(test) datatype_descriptor = class_manager.datatype_descriptors[datatype] @@ -748,7 +516,7 @@ def _create_compatible_parameters(self, test, parameters): def _do_check_methods_test(self, test, question): """Test methods of non-standard objects within the Device Model""" - device_model = self.query_device_model(test) + device_model = self.is12_utils.query_device_model(test) methods = self._get_methods(test, device_model) @@ -946,7 +714,7 @@ def check_sequence_methods(self, test, oid, property_id, property_name, context= def validate_sequences(self, test): """Test all writable sequences""" - device_model = self.query_device_model(test) + device_model = self.is12_utils.query_device_model(test) constrained_properties = self._get_properties(test, device_model, get_constraints=False, get_sequences=True)