Skip to content

Commit

Permalink
Refactor check_notebook_results.py into python module. (kubeflow#1947)
Browse files Browse the repository at this point in the history
* Refactor check_notebook_results.py into python module.

* Fix indentation.

* Fix indentation.

* Fix indentation.

* Fix indentation.
  • Loading branch information
Jiaxiao Zheng authored and ajchili committed Aug 28, 2019
1 parent 2943278 commit ee4d16a
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 110 deletions.
122 changes: 55 additions & 67 deletions test/sample-test/check_notebook_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,84 +12,72 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
from kfp import Client
import utils

###### Input/Output Instruction ######
# args:
# experiment: where the test run belong, only necessary when a job is submitted.
# namespace: where the pipeline system is deployed.
# testname: test name in the json xml
# result: name of the file that stores the test result
# exit_code: the exit code of the bash command that runs the test.
from kfp import Client


# Parsing the input arguments
def parse_arguments():
"""Parse command line arguments."""
_RUN_LIST_PAGE_SIZE = 1000
_TEST_TIMEOUT = 1200

parser = argparse.ArgumentParser()
parser.add_argument('--experiment',
type=str,
help='The experiment name')
parser.add_argument('--namespace',
type=str,
default='kubeflow',
help="namespace of the deployed pipeline system. Default: kubeflow")
parser.add_argument('--testname',
type=str,
required=True,
help="Test name")
parser.add_argument('--result',
type=str,
required=True,
help='The path of the test result that will be exported.')
parser.add_argument('--exit-code',
type=str,
required=True,
help='The exit code of the bash command that runs the test.')
args = parser.parse_args()
return args
class NoteBookChecker(object):
def __init__(self, testname, result, exit_code,
experiment=None, namespace='kubeflow'):
""" Util class for checking notebook sample test running results.
def main():
args = parse_arguments()
test_cases = []
test_name = args.testname + ' Sample Test'
:param testname: test name in the json xml.
:param result: name of the file that stores the test result
:param exit_code: the exit code of the notebook run. 0 for passed test.
:param experiment: where the test run belong, only necessary when a job is submitted.
:param namespace: where the pipeline system is deployed.
"""
self._testname = testname
self._result = result
self._exit_code = exit_code
self._experiment = experiment
self._namespace = namespace

###### Write the script exit code log ######
utils.add_junit_test(test_cases, 'test script execution', (args.exit_code == '0'), 'test script failure with exit code: ' + args.exit_code)
def check(self):
test_cases = []
test_name = self._testname + ' Sample Test'

if args.experiment is not None:
###### Initialization ######
host = 'ml-pipeline.%s.svc.cluster.local:8888' % args.namespace
client = Client(host=host)
###### Write the script exit code log ######
utils.add_junit_test(test_cases, 'test script execution',
(self._exit_code == '0'),
'test script failure with exit code: '
+ self._exit_code)

###### Get experiments ######
experiment_id = client.get_experiment(experiment_name=args.experiment).id
if self._experiment is not None: # Bypassing dsl type check sample.
###### Initialization ######
host = 'ml-pipeline.%s.svc.cluster.local:8888' % self._namespace
client = Client(host=host)

###### Get runs ######
list_runs_response = client.list_runs(page_size=1000, experiment_id=experiment_id)
###### Get experiments ######
experiment_id = client.get_experiment(experiment_name=self._experiment).id

###### Check all runs ######
for run in list_runs_response.runs:
run_id = run.id
response = client.wait_for_run_completion(run_id, 1200)
succ = (response.run.status.lower()=='succeeded')
utils.add_junit_test(test_cases, 'job completion', succ, 'waiting for job completion failure')
###### Get runs ######
list_runs_response = client.list_runs(page_size=_RUN_LIST_PAGE_SIZE,
experiment_id=experiment_id)

###### Output Argo Log for Debugging ######
workflow_json = client._get_workflow_json(run_id)
workflow_id = workflow_json['metadata']['name']
argo_log, _ = utils.run_bash_command('argo logs -n {} -w {}'.format(args.namespace, workflow_id))
print("=========Argo Workflow Log=========")
print(argo_log)
###### Check all runs ######
for run in list_runs_response.runs:
run_id = run.id
response = client.wait_for_run_completion(run_id, _TEST_TIMEOUT)
succ = (response.run.status.lower()=='succeeded')
utils.add_junit_test(test_cases, 'job completion',
succ, 'waiting for job completion failure')

if not succ:
utils.write_junit_xml(test_name, args.result, test_cases)
exit(1)
###### Output Argo Log for Debugging ######
workflow_json = client._get_workflow_json(run_id)
workflow_id = workflow_json['metadata']['name']
argo_log, _ = utils.run_bash_command(
'argo logs -n {} -w {}'.format(self._namespace, workflow_id))
print("=========Argo Workflow Log=========")
print(argo_log)

###### Write out the test result in junit xml ######
utils.write_junit_xml(test_name, args.result, test_cases)
if not succ:
utils.write_junit_xml(test_name, self._result, test_cases)
exit(1)

if __name__ == "__main__":
main()
###### Write out the test result in junit xml ######
utils.write_junit_xml(test_name, self._result, test_cases)
75 changes: 32 additions & 43 deletions test/sample-test/sample_test_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import sys
import utils

from check_notebook_results import NoteBookChecker


_PAPERMILL_ERR_MSG = 'An Exception was encountered at'


#TODO(numerology): Add unit-test for classes.
class SampleTest(object):
"""Launch a KFP sample_test provided its name.
Expand All @@ -43,7 +45,7 @@ class SampleTest(object):
TEST_DIR = BASE_DIR + '/test/sample-test'

def __init__(self, test_name, results_gcs_dir, target_image_prefix='',
namespace='kubeflow'):
namespace='kubeflow'):
self._test_name = test_name
self._results_gcs_dir = results_gcs_dir
# Capture the first segment after gs:// as the project name.
Expand Down Expand Up @@ -81,35 +83,22 @@ def check_result(self):
def check_notebook_result(self):
# Workaround because papermill does not directly return exit code.
exit_code = '1' if _PAPERMILL_ERR_MSG in \
open('%s.ipynb' % self._test_name).read() else '0'
open('%s.ipynb' % self._test_name).read() else '0'

os.chdir(self.TEST_DIR)

if self._test_name == 'dsl_static_type_checking':
subprocess.call([
sys.executable,
'check_notebook_results.py',
'--testname',
self._test_name,
'--result',
self._sample_test_result,
'--exit-code',
exit_code
])
nbchecker = NoteBookChecker(testname=self._test_name,
result=self._sample_test_result,
exit_code=exit_code)
nbchecker.check()
else:
subprocess.call([
sys.executable,
'check_notebook_results.py',
'--experiment',
'%s-test' % self._test_name,
'--testname',
self._test_name,
'--result',
self._sample_test_result,
'--namespace',
self._namespace,
'--exit-code',
exit_code
])
nbchecker = NoteBookChecker(testname=self._test_name,
result=self._sample_test_result,
exit_code=exit_code,
experiment=None,
namespace='kubeflow')
nbchecker.check()

print('Copy the test results to GCS %s/' % self._results_gcs_dir)

Expand Down Expand Up @@ -159,22 +148,22 @@ class ComponentTest(SampleTest):
include xgboost_training_cm tfx_cab_classification
"""
def __init__(self, test_name, results_gcs_dir,
dataflow_tft_image,
dataflow_predict_image,
dataflow_tfma_image,
dataflow_tfdv_image,
dataproc_create_cluster_image,
dataproc_delete_cluster_image,
dataproc_analyze_image,
dataproc_transform_image,
dataproc_train_image,
dataproc_predict_image,
kubeflow_dnntrainer_image,
kubeflow_deployer_image,
local_confusionmatrix_image,
local_roc_image,
target_image_prefix='',
namespace='kubeflow'):
dataflow_tft_image,
dataflow_predict_image,
dataflow_tfma_image,
dataflow_tfdv_image,
dataproc_create_cluster_image,
dataproc_delete_cluster_image,
dataproc_analyze_image,
dataproc_transform_image,
dataproc_train_image,
dataproc_predict_image,
kubeflow_dnntrainer_image,
kubeflow_deployer_image,
local_confusionmatrix_image,
local_roc_image,
target_image_prefix='',
namespace='kubeflow'):
super().__init__(
test_name=test_name,
results_gcs_dir=results_gcs_dir,
Expand Down

0 comments on commit ee4d16a

Please sign in to comment.