diff --git a/ocs_ci/ocs/node.py b/ocs_ci/ocs/node.py index 8895d9a3a9f..27f5b3a8a42 100644 --- a/ocs_ci/ocs/node.py +++ b/ocs_ci/ocs/node.py @@ -486,3 +486,16 @@ def get_node_from_machine_name(machine_name): return machine_obj.get().get( 'status' ).get('addresses')[1].get('address') + + +def get_provider(): + """ + Return the OCP Provider (Platform) + + Returns: + str: The Provider that the OCP is running on + + """ + + ocp_cluster = OCP(kind='', resource_name='nodes') + return ocp_cluster.get('nodes')['items'][0]['spec']['providerID'].split(':')[0] diff --git a/ocs_ci/ocs/ocp.py b/ocs_ci/ocs/ocp.py index bdedda0a01e..578de4fb235 100644 --- a/ocs_ci/ocs/ocp.py +++ b/ocs_ci/ocs/ocp.py @@ -841,6 +841,66 @@ def get_logs( return output +def get_clustername(): + """ + Return the name (DNS short name) of the cluster + + Returns: + str: the short DNS name of the cluster + + """ + + ocp_cluster = OCP( + namespace='openshift-console', kind='', + resource_name='route') + return ocp_cluster.get()['items'][0]['spec']['host'].split('.')[2] + + +def get_ocs_version(): + """ + Return the OCS Version + + Returns: + str: The version of the OCS + + """ + + ocp_cluster = OCP( + namespace=config.ENV_DATA['cluster_namespace'], + kind='', resource_name='csv') + return ocp_cluster.get()['items'][0]['spec']['version'] + + +def get_build(): + """ + Return the OCP Build Version + + Returns: + str: The build version of the OCP + + """ + + ocp_cluster = OCP( + namespace=config.ENV_DATA['cluster_namespace'], + kind='', resource_name='clusterversion') + return ocp_cluster.get()['items'][0]['status']['desired']['version'] + + +def get_ocp_channel(): + """ + Return the OCP Channel + + Returns: + str: The channel of the OCP + + """ + + ocp_cluster = OCP( + namespace=config.ENV_DATA['cluster_namespace'], + kind='', resource_name='clusterversion') + return ocp_cluster.get()['items'][0]['spec']['channel'] + + def switch_to_project(project_name): """ Switch to another project diff --git a/ocs_ci/templates/workloads/smallfile/SmallFile.yaml b/ocs_ci/templates/workloads/smallfile/SmallFile.yaml index f782cfe3921..cb9e5bee4d2 100644 --- a/ocs_ci/templates/workloads/smallfile/SmallFile.yaml +++ b/ocs_ci/templates/workloads/smallfile/SmallFile.yaml @@ -9,11 +9,14 @@ metadata: namespace: my-ripsaw spec: test_user: homer_simpson - clustername: aws-dec26-2019 + clustername: place-holder elasticsearch: - es: es_server - es_port: 9090 - es_index: smallfile + #server: 10.0.144.152 <- debuging server + server: 10.0.78.167 + port: 9200 + es_index: ripsaw-smallfile + metadata_collection: true + index_data: true workload: name: smallfile args: diff --git a/requirements.txt b/requirements.txt index d6e1198b1ab..c038b2b81ac 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ -e . +elasticsearch +numpy \ No newline at end of file diff --git a/tests/e2e/performance/test_small_file_workload.py b/tests/e2e/performance/test_small_file_workload.py index 64d9540c279..99c8c0f618d 100644 --- a/tests/e2e/performance/test_small_file_workload.py +++ b/tests/e2e/performance/test_small_file_workload.py @@ -5,8 +5,10 @@ import pytest import time -from ocs_ci.ocs.ocp import OCP -from ocs_ci.utility.utils import TimeoutSampler +from ocs_ci.ocs.ocp import (OCP, get_clustername, get_ocs_version, + get_build, get_ocp_channel) +from ocs_ci.ocs.node import get_provider +from ocs_ci.utility.utils import TimeoutSampler, get_ocp_version, run_cmd from ocs_ci.ocs.resources.ocs import OCS from ocs_ci.utility import templating from ocs_ci.ocs.utils import get_pod_name_by_pattern @@ -14,15 +16,307 @@ from ocs_ci.ocs import constants from ocs_ci.framework.testlib import E2ETest, performance from tests.helpers import get_logs_with_errors +from elasticsearch import (Elasticsearch, exceptions as ESExp) +import numpy as np log = logging.getLogger(__name__) +class SmallFileResultsAnalyse(object): + """ + This class is reading all test results from elasticsearch server (which the + ripsaw running of the benchmark is generate), aggregate them by : + test operation (e.g. create / delete etc.) + sample (for test to be valid it need to run with more the one sample) + host (test can be run on more then one pod {called host}) + + it generates results for all tests as one unit which will be valid only + if the deviation between samples is less the 5% + + """ + + managed_keys = { + 'IOPS': {'name': 'iops', 'op': np.sum}, + 'MiBps': {'name': 'mbps', 'op': np.sum}, + 'elapsed': {'name': 'elapsed-time', 'op': np.average}, + 'files': {'name': 'Files-per-thread', 'op': np.sum}, + 'files-per-sec': {'name': 'Files-per-sec', 'op': np.sum}, + 'records': {'name': 'Rec-per-thread', 'op': np.sum}, + } + + def __init__(self, uuid, crd): + """ + Initialize the object by reading some of the data from the CRD file and + by connecting to the ES server and read all results from it. + + Args: + uuid (str): the unique uid of the test + crd (dict): dictionary with test parameters - the test yaml file + that modify it in the test itself. + + """ + + self.uuid = uuid + + self.server = crd['spec']['elasticsearch']['server'] + self.port = crd['spec']['elasticsearch']['port'] + self.index = crd['spec']['es_index'] + '-results' + self.new_index = crd['spec']['es_index'] + '-fullres' + self.all_results = {} + + # make sure we have connection to the elastic search server + log.info(f'Connecting to ES {self.server} on port {self.port}') + try: + self.es = Elasticsearch([{'host': self.server, 'port': self.port}]) + except ESExp.ConnectionError: + log.error('can not connect to ES server {}:{}'.format( + self.server, self.port)) + raise + + # Creating full results dictionary + self.results = { + 'clustername': crd['spec']['clustername'], + 'clients': crd['spec']['workload']['args']['clients'], + 'samples': crd['spec']['workload']['args']['samples'], + 'threads': crd['spec']['workload']['args']['threads'], + 'operations': crd['spec']['workload']['args']['operation'], + 'uuid': uuid, + 'full-res': {} + } + + # Calculate the number of records for the test + self.records = self.results['clients'] * self.results['threads'] + self.records *= self.results['samples'] + self.records *= len(self.results['operations']) + + def add_key(self, key, value): + """ + Adding (key and value) to this object results dictionary as a new + dictionary. + + Args: + key (str): String which will be the key for the value + value (*): value to add, can be any kind of data type + + """ + self.results.update({key: value}) + + def read(self): + """ + Reading all test records from the elasticsearch server into dictionary + inside this object + + """ + + query = {'query': {'match': {'uuid': self.uuid}}} + log.info('Reading all data from ES server') + self.all_results = self.es.search( + index=self.index, body=query, size=self.records + ) + + def write(self): + """ + Writing the results to the elasticsearch server + + """ + log.info('Writing all data to ES server') + log.info(f'the results data is {self.results}') + self.es.index(index=self.new_index, doc_type='_doc', body=self.results) + + def thread_read(self, host, op, snum): + """ + This method read all threads record of one host / operation and sample + + Args: + host (str): the name of the pod that ran the test + op (str): the operation that is tested + snum (int): sample of test as string + + Returns: + dict : dictionary of results records + + """ + + res = {} + log.debug(f'Reading all threads for {op} / {snum} / {host}') + for hit in self.all_results['hits']['hits']: + + if ( + hit['_source']['host'] == host and hit['_source'][ + 'optype'] == op and hit['_source']['sample'] == snum + ): + for key in self.managed_keys.keys(): + # not all operation have all values, so i am using try + try: + val = float('{:.2f}'.format(hit['_source'][key])) + if self.managed_keys[key]['name'] in res.keys(): + res[self.managed_keys[key]['name']].append(val) + else: + res[self.managed_keys[key]['name']] = [val] + except Exception: + pass + res = self.aggregate_threads_results(res) + return res + + def aggregate_threads_results(self, res): + """ + Aggregation of one section of results, this can be threads in host, + hosts in sample, samples in test + + Args: + res (dict) : dictionary of results + + Returns: + dict : dictionary with the aggregate results. + + """ + + results = {} + for key in self.managed_keys.keys(): + if self.managed_keys[key]['name'] in res.keys(): + results[key] = self.managed_keys[key]['op']( + res[self.managed_keys[key]['name']] + ) + + # This is the place to check in host (treads) deviation. + + return results + + def combine_results(self, results, clear): + """ + Combine 2 or more results (hosts in sample / samples in test) + to one result. + + Args: + results (dict): dictionary of results to combine + clear (bool): return only combined results or not. + True - return only combined results + False - add the combine results to originals results + + Returns: + dict : dictionary of results records + + """ + + res = {} + log.info(f'The results to combine {results}') + for rec in results.keys(): + record = results[rec] + for key in self.managed_keys.keys(): + # not all operation have all values, so i am using try + try: + val = float('{:.2f}'.format(record[key])) + if self.managed_keys[key]['name'] in res.keys(): + res[self.managed_keys[key]['name']].append(val) + else: + res[self.managed_keys[key]['name']] = [val] + except Exception: + pass + if not clear: + res.update(self.aggregate_threads_results(res)) + else: + res = self.aggregate_threads_results(res) + return res + + def aggregate_host_results(self): + """ + Aggregation results from all hosts in single sample + + """ + + results = {} + + for op in self.results['operations']: + for smp in range(self.results['samples']): + sample = smp + 1 + if op in self.results['full-res'].keys(): + self.results['full-res'][op][sample] = self.combine_results( + self.results['full-res'][op][sample], True) + + return results + + def aggregate_samples_results(self): + """ + Aggregation results from all hosts in single sample, and compare + between samples. + + Returns: + bool: True if results deviation (between samples) is les or equal + to 5%, otherwise False + + """ + + test_pass = True + for op in self.results["operations"]: + log.info(f'Aggregating {op} - {self.results["full-res"][op]}') + results = self.combine_results(self.results["full-res"][op], False) + + log.info(f'Check IOPS {op} samples deviation') + + for key in self.managed_keys.keys(): + if self.managed_keys[key]["name"] in results.keys(): + results[key] = np.average( + results[self.managed_keys[key]["name"]] + ) + if key == "IOPS": + max = np.ma.max(results[self.managed_keys[key]["name"]]) + min = np.ma.min(results[self.managed_keys[key]["name"]]) + + dev = (max - min) * 100 / min + if dev > 5: + log.error( + f'Deviation for {op} IOPS is more the 5% ({dev})') + test_pass = False + del results[self.managed_keys[key]["name"]] + self.results["full-res"][op] = results + + return test_pass + + def get_clients_list(self): + """ + Finding and creating a list of all hosts that was used in this test + + Returns: + list: a list of pods name + + """ + + res = [] + for hit in self.all_results['hits']['hits']: + host = hit['_source']['host'] + if host not in res: + res.append(host) + log.info(f'The pods names used in this test are {res}') + return res + + def init_full_results(self): + """ + Initialaze the full results Internal DB as dictionary. + + """ + + log.info('Initialising results DB') + + # High level of internal results DB is operation + for op in self.results['operations']: + self.results['full-res'][op] = {} + + # second level is sample + for smp in range(self.results['samples']): + sample = smp + 1 + self.results['full-res'][op][sample] = {} + + # last level is host (all threads will be in the host) + for host in self.results['hosts']: + self.results['full-res'][op][sample][ + host] = self.thread_read(host, op, sample) + + @pytest.fixture(scope='function') def ripsaw(request, storageclass_factory): - def teardown(): ripsaw.cleanup() + request.addfinalizer(teardown) ripsaw = RipSaw() @@ -44,36 +338,40 @@ class TestSmallFileWorkload(E2ETest): argnames=["file_size", "files", "threads", "samples", "interface"], argvalues=[ pytest.param( - *[4, 50000, 4, 1, constants.CEPHBLOCKPOOL], + *[4, 50000, 4, 3, constants.CEPHBLOCKPOOL], marks=pytest.mark.polarion_id("OCS-1295"), ), + pytest.param( - *[16, 50000, 4, 1, constants.CEPHBLOCKPOOL], + *[16, 50000, 4, 3, constants.CEPHBLOCKPOOL], marks=pytest.mark.polarion_id("OCS-2020"), ), pytest.param( - *[16, 200000, 4, 1, constants.CEPHBLOCKPOOL], + *[16, 200000, 4, 3, constants.CEPHBLOCKPOOL], marks=pytest.mark.polarion_id("OCS-2021"), ), pytest.param( - *[4, 50000, 4, 1, constants.CEPHFILESYSTEM], + *[4, 50000, 4, 3, constants.CEPHFILESYSTEM], marks=pytest.mark.polarion_id("OCS-2022"), ), pytest.param( - *[16, 50000, 4, 1, constants.CEPHFILESYSTEM], + *[16, 50000, 4, 3, constants.CEPHFILESYSTEM], marks=pytest.mark.polarion_id("OCS-2023"), ), - pytest.param( - *[16, 200000, 4, 1, constants.CEPHFILESYSTEM], - marks=pytest.mark.polarion_id("OCS-2024"), - ), + ] ) @pytest.mark.polarion_id("OCS-1295") - def test_smallfile_workload(self, ripsaw, file_size, files, threads, samples, interface): + def test_smallfile_workload(self, ripsaw, file_size, files, threads, + samples, interface): """ Run SmallFile Workload """ + + # getting the name and email of the user that running the test. + user = run_cmd('git config --get user.name').strip() + email = run_cmd('git config --get user.email').strip() + log.info("Apply Operator CRD") ripsaw.apply_crd('resources/crds/ripsaw_v1alpha1_ripsaw_crd.yaml') sf_data = templating.load_yaml(constants.SMALLFILE_BENCHMARK_YAML) @@ -92,9 +390,15 @@ def test_smallfile_workload(self, ripsaw, file_size, files, threads, samples, in sf_data['spec']['workload']['args']['files'] = files sf_data['spec']['workload']['args']['threads'] = threads sf_data['spec']['workload']['args']['samples'] = samples - """ Calculating the size of the volume that need to be test, it should be at least twice in the size then the - size of the files, and at least 100Gi. - since the file_size is in Kb and the vol_size need to be in Gb, more calculation is needed. + sf_data['spec']['clustername'] = get_clustername() + sf_data['spec']['test_user'] = f'{user}<{email}>' + """ + Calculating the size of the volume that need to be test, it should + be at least twice in the size then the size of the files, and at + least 100Gi. + + Since the file_size is in Kb and the vol_size need to be in Gb, more + calculation is needed. """ vol_size = int(files * threads * file_size * 3) vol_size = int(vol_size / constants.GB2KB) @@ -104,6 +408,8 @@ def test_smallfile_workload(self, ripsaw, file_size, files, threads, samples, in sf_obj = OCS(**sf_data) sf_obj.create() + log.info(f'The smallfile yaml file is {sf_data}') + # wait for benchmark pods to get created - takes a while for bench_pod in TimeoutSampler( 120, 3, get_pod_name_by_pattern, 'smallfile-client', 'my-ripsaw' @@ -125,16 +431,50 @@ def test_smallfile_workload(self, ripsaw, file_size, files, threads, samples, in ) start_time = time.time() timeout = 1800 + + # Getting the UUID from inside the benchmark pod + output = bench_pod.exec_oc_cmd(f'exec {small_file_client_pod} env') + for line in output.split(): + if 'uuid=' in line: + uuid = line.split('=')[1] + log.info(f'the UUID of the test is : {uuid}') + full_results = SmallFileResultsAnalyse(uuid, sf_data) + + # Initialaize the results doc file. + full_results.add_key('user', sf_data['spec']['test_user']) + full_results.add_key('ocp_version', get_ocp_version()) + full_results.add_key('ocp_build', get_build()) + full_results.add_key('ocp_channel', get_ocp_channel()) + full_results.add_key('ocs_version', get_ocs_version()) + full_results.add_key('vendor', get_provider()) + full_results.add_key('start_time', + time.strftime('%Y-%m-%dT%H:%M:%SGMT', + time.gmtime())) + full_results.add_key('global_options', { + 'files': files, + 'file_size': file_size, + 'storageclass': sf_data['spec']['workload']['args']['storageclass'], + 'vol_size': sf_data['spec']['workload']['args']['storagesize'] + }) + while True: logs = bench_pod.exec_oc_cmd( f'logs {small_file_client_pod}', out_yaml_format=False ) if "RUN STATUS DONE" in logs: - log.info("SmallFile Benchmark Completed Successfully") + full_results.add_key('end_time', + time.strftime('%Y-%m-%dT%H:%M:%SGMT', + time.gmtime())) + full_results.read() + full_results.add_key('hosts', full_results.get_clients_list()) + full_results.init_full_results() + full_results.aggregate_host_results() + test_status = full_results.aggregate_samples_results() + full_results.write() break if timeout < (time.time() - start_time): - raise TimeoutError(f"Timed out waiting for benchmark to complete") + raise TimeoutError("Timed out waiting for benchmark to complete") time.sleep(30) - assert not get_logs_with_errors() + assert (not get_logs_with_errors() and test_status), 'Test Failed'