diff --git a/qds_sdk/pipelines.py b/qds_sdk/pipelines.py index f1fa500d..ffe6f64d 100644 --- a/qds_sdk/pipelines.py +++ b/qds_sdk/pipelines.py @@ -339,23 +339,70 @@ def create(cls, pipeline_name, create_type, **kwargs): Args: pipeline_name: Name to be given. - create_type: 1->Assisted, 2->Code, 3->Jar + create_type: 1->Assisted, 2->Jar, 3->Code **kwargs: keyword arguments specific to create type Returns: response """ conn = Qubole.agent() - data = {"data": { - "attributes": - {"name": pipeline_name, "status": "DRAFT", - "create_type": create_type}, - "type": "pipelines"} - } - url = Pipelines.rest_entity_path + "?mode=wizard" + url = Pipelines.rest_entity_path + if create_type is None: + raise ParseError("Provide create_type for Pipeline.", None) + if not kwargs or create_type == 1: + data = { + "data": { + "attributes": { + "name": pipeline_name, + "status": "DRAFT", + "create_type": create_type + }, + "type": "pipeline" + } + } + url = url + "?mode=wizard" + else: + data = { + "data": { + "type": "pipeline", + "attributes": { + "name": pipeline_name, + "create_type": create_type, + "properties": { + "cluster_label": kwargs.get('cluster_label'), + "can_retry": kwargs.get('can_retry'), + "command_line_options": kwargs.get('command_line_options'), + "user_arguments": kwargs.get('user_arguments') + } + }, + "relationships": { + "alerts": { + "data": { + "type": "pipeline/alerts", + "attributes": { + "can_notify": kwargs.get('can_notify'), + "notification_channels": kwargs.get('channel_ids') + } + } + } + } + } + } + if create_type == 2: + data['data']['attributes']['properties']['jar_path'] = \ + kwargs.get('jar_path') + data['data']['attributes']['properties']['main_class_name'] = \ + kwargs.get('main_class_name') + elif create_type == 3: + data['data']['attributes']['properties']['code'] = \ + kwargs.get('code') + data['data']['attributes']['properties']['language'] = \ + kwargs.get('language') + response = conn.post(url, data) cls.pipeline_id = Pipelines.get_pipline_id(response) cls.pipeline_name = pipeline_name + return response @staticmethod def start(pipeline_id): diff --git a/tests/test_quest.py b/tests/test_quest.py index 0ee50385..71dc75ed 100644 --- a/tests/test_quest.py +++ b/tests/test_quest.py @@ -64,7 +64,7 @@ def test_create_pipeline(self): '--cluster-label', 'spark', '-c', 'print("hello")', '--language', 'python', '--user-arguments', 'users_argument'] print_command() d1 = {"data": {"attributes": {"name": "test_pipeline_name", "status": "DRAFT", "create_type": 3}, - "type": "pipelines"}} + "type": "pipeline"}} response = {"relationships": {"nodes": [], "alerts": []}, "included": [], "meta": {"command_details": {"code": "print(\"hello\")", "language": "python"}, "properties": {"checkpoint_location": None, "trigger_interval": None,