diff --git a/test/sample-test/check_notebook_results.py b/test/sample-test/check_notebook_results.py index eb78288cff2..d7bb880b177 100644 --- a/test/sample-test/check_notebook_results.py +++ b/test/sample-test/check_notebook_results.py @@ -12,84 +12,72 @@ # See the License for the specific language governing permissions and # limitations under the License. -import argparse -from kfp import Client import utils -###### Input/Output Instruction ###### -# args: -# experiment: where the test run belong, only necessary when a job is submitted. -# namespace: where the pipeline system is deployed. -# testname: test name in the json xml -# result: name of the file that stores the test result -# exit_code: the exit code of the bash command that runs the test. +from kfp import Client + -# Parsing the input arguments -def parse_arguments(): - """Parse command line arguments.""" +_RUN_LIST_PAGE_SIZE = 1000 +_TEST_TIMEOUT = 1200 - parser = argparse.ArgumentParser() - parser.add_argument('--experiment', - type=str, - help='The experiment name') - parser.add_argument('--namespace', - type=str, - default='kubeflow', - help="namespace of the deployed pipeline system. Default: kubeflow") - parser.add_argument('--testname', - type=str, - required=True, - help="Test name") - parser.add_argument('--result', - type=str, - required=True, - help='The path of the test result that will be exported.') - parser.add_argument('--exit-code', - type=str, - required=True, - help='The exit code of the bash command that runs the test.') - args = parser.parse_args() - return args +class NoteBookChecker(object): + def __init__(self, testname, result, exit_code, + experiment=None, namespace='kubeflow'): + """ Util class for checking notebook sample test running results. -def main(): - args = parse_arguments() - test_cases = [] - test_name = args.testname + ' Sample Test' + :param testname: test name in the json xml. + :param result: name of the file that stores the test result + :param exit_code: the exit code of the notebook run. 0 for passed test. + :param experiment: where the test run belong, only necessary when a job is submitted. + :param namespace: where the pipeline system is deployed. + """ + self._testname = testname + self._result = result + self._exit_code = exit_code + self._experiment = experiment + self._namespace = namespace - ###### Write the script exit code log ###### - utils.add_junit_test(test_cases, 'test script execution', (args.exit_code == '0'), 'test script failure with exit code: ' + args.exit_code) + def check(self): + test_cases = [] + test_name = self._testname + ' Sample Test' - if args.experiment is not None: - ###### Initialization ###### - host = 'ml-pipeline.%s.svc.cluster.local:8888' % args.namespace - client = Client(host=host) + ###### Write the script exit code log ###### + utils.add_junit_test(test_cases, 'test script execution', + (self._exit_code == '0'), + 'test script failure with exit code: ' + + self._exit_code) - ###### Get experiments ###### - experiment_id = client.get_experiment(experiment_name=args.experiment).id + if self._experiment is not None: # Bypassing dsl type check sample. + ###### Initialization ###### + host = 'ml-pipeline.%s.svc.cluster.local:8888' % self._namespace + client = Client(host=host) - ###### Get runs ###### - list_runs_response = client.list_runs(page_size=1000, experiment_id=experiment_id) + ###### Get experiments ###### + experiment_id = client.get_experiment(experiment_name=self._experiment).id - ###### Check all runs ###### - for run in list_runs_response.runs: - run_id = run.id - response = client.wait_for_run_completion(run_id, 1200) - succ = (response.run.status.lower()=='succeeded') - utils.add_junit_test(test_cases, 'job completion', succ, 'waiting for job completion failure') + ###### Get runs ###### + list_runs_response = client.list_runs(page_size=_RUN_LIST_PAGE_SIZE, + experiment_id=experiment_id) - ###### Output Argo Log for Debugging ###### - workflow_json = client._get_workflow_json(run_id) - workflow_id = workflow_json['metadata']['name'] - argo_log, _ = utils.run_bash_command('argo logs -n {} -w {}'.format(args.namespace, workflow_id)) - print("=========Argo Workflow Log=========") - print(argo_log) + ###### Check all runs ###### + for run in list_runs_response.runs: + run_id = run.id + response = client.wait_for_run_completion(run_id, _TEST_TIMEOUT) + succ = (response.run.status.lower()=='succeeded') + utils.add_junit_test(test_cases, 'job completion', + succ, 'waiting for job completion failure') - if not succ: - utils.write_junit_xml(test_name, args.result, test_cases) - exit(1) + ###### Output Argo Log for Debugging ###### + workflow_json = client._get_workflow_json(run_id) + workflow_id = workflow_json['metadata']['name'] + argo_log, _ = utils.run_bash_command( + 'argo logs -n {} -w {}'.format(self._namespace, workflow_id)) + print("=========Argo Workflow Log=========") + print(argo_log) - ###### Write out the test result in junit xml ###### - utils.write_junit_xml(test_name, args.result, test_cases) + if not succ: + utils.write_junit_xml(test_name, self._result, test_cases) + exit(1) -if __name__ == "__main__": - main() \ No newline at end of file + ###### Write out the test result in junit xml ###### + utils.write_junit_xml(test_name, self._result, test_cases) \ No newline at end of file diff --git a/test/sample-test/sample_test_launcher.py b/test/sample-test/sample_test_launcher.py index 4fe3ed7e8a4..355953a93e1 100644 --- a/test/sample-test/sample_test_launcher.py +++ b/test/sample-test/sample_test_launcher.py @@ -23,10 +23,12 @@ import sys import utils +from check_notebook_results import NoteBookChecker + + _PAPERMILL_ERR_MSG = 'An Exception was encountered at' -#TODO(numerology): Add unit-test for classes. class SampleTest(object): """Launch a KFP sample_test provided its name. @@ -43,7 +45,7 @@ class SampleTest(object): TEST_DIR = BASE_DIR + '/test/sample-test' def __init__(self, test_name, results_gcs_dir, target_image_prefix='', - namespace='kubeflow'): + namespace='kubeflow'): self._test_name = test_name self._results_gcs_dir = results_gcs_dir # Capture the first segment after gs:// as the project name. @@ -81,35 +83,22 @@ def check_result(self): def check_notebook_result(self): # Workaround because papermill does not directly return exit code. exit_code = '1' if _PAPERMILL_ERR_MSG in \ - open('%s.ipynb' % self._test_name).read() else '0' + open('%s.ipynb' % self._test_name).read() else '0' os.chdir(self.TEST_DIR) + if self._test_name == 'dsl_static_type_checking': - subprocess.call([ - sys.executable, - 'check_notebook_results.py', - '--testname', - self._test_name, - '--result', - self._sample_test_result, - '--exit-code', - exit_code - ]) + nbchecker = NoteBookChecker(testname=self._test_name, + result=self._sample_test_result, + exit_code=exit_code) + nbchecker.check() else: - subprocess.call([ - sys.executable, - 'check_notebook_results.py', - '--experiment', - '%s-test' % self._test_name, - '--testname', - self._test_name, - '--result', - self._sample_test_result, - '--namespace', - self._namespace, - '--exit-code', - exit_code - ]) + nbchecker = NoteBookChecker(testname=self._test_name, + result=self._sample_test_result, + exit_code=exit_code, + experiment=None, + namespace='kubeflow') + nbchecker.check() print('Copy the test results to GCS %s/' % self._results_gcs_dir) @@ -159,22 +148,22 @@ class ComponentTest(SampleTest): include xgboost_training_cm tfx_cab_classification """ def __init__(self, test_name, results_gcs_dir, - dataflow_tft_image, - dataflow_predict_image, - dataflow_tfma_image, - dataflow_tfdv_image, - dataproc_create_cluster_image, - dataproc_delete_cluster_image, - dataproc_analyze_image, - dataproc_transform_image, - dataproc_train_image, - dataproc_predict_image, - kubeflow_dnntrainer_image, - kubeflow_deployer_image, - local_confusionmatrix_image, - local_roc_image, - target_image_prefix='', - namespace='kubeflow'): + dataflow_tft_image, + dataflow_predict_image, + dataflow_tfma_image, + dataflow_tfdv_image, + dataproc_create_cluster_image, + dataproc_delete_cluster_image, + dataproc_analyze_image, + dataproc_transform_image, + dataproc_train_image, + dataproc_predict_image, + kubeflow_dnntrainer_image, + kubeflow_deployer_image, + local_confusionmatrix_image, + local_roc_image, + target_image_prefix='', + namespace='kubeflow'): super().__init__( test_name=test_name, results_gcs_dir=results_gcs_dir,