Skip to content

Commit

Permalink
Quest-674: Changes to support BYOC/BYOJ pipeline creation (#325)
Browse files Browse the repository at this point in the history
  • Loading branch information
siddharth1001 authored Jun 22, 2020
1 parent 96d16fd commit fe5c4da
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 9 deletions.
63 changes: 55 additions & 8 deletions qds_sdk/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,23 +339,70 @@ def create(cls, pipeline_name, create_type, **kwargs):
Args:
pipeline_name: Name to be given.
create_type: 1->Assisted, 2->Code, 3->Jar
create_type: 1->Assisted, 2->Jar, 3->Code
**kwargs: keyword arguments specific to create type
Returns:
response
"""
conn = Qubole.agent()
data = {"data": {
"attributes":
{"name": pipeline_name, "status": "DRAFT",
"create_type": create_type},
"type": "pipelines"}
}
url = Pipelines.rest_entity_path + "?mode=wizard"
url = Pipelines.rest_entity_path
if create_type is None:
raise ParseError("Provide create_type for Pipeline.", None)
if not kwargs or create_type == 1:
data = {
"data": {
"attributes": {
"name": pipeline_name,
"status": "DRAFT",
"create_type": create_type
},
"type": "pipeline"
}
}
url = url + "?mode=wizard"
else:
data = {
"data": {
"type": "pipeline",
"attributes": {
"name": pipeline_name,
"create_type": create_type,
"properties": {
"cluster_label": kwargs.get('cluster_label'),
"can_retry": kwargs.get('can_retry'),
"command_line_options": kwargs.get('command_line_options'),
"user_arguments": kwargs.get('user_arguments')
}
},
"relationships": {
"alerts": {
"data": {
"type": "pipeline/alerts",
"attributes": {
"can_notify": kwargs.get('can_notify'),
"notification_channels": kwargs.get('channel_ids')
}
}
}
}
}
}
if create_type == 2:
data['data']['attributes']['properties']['jar_path'] = \
kwargs.get('jar_path')
data['data']['attributes']['properties']['main_class_name'] = \
kwargs.get('main_class_name')
elif create_type == 3:
data['data']['attributes']['properties']['code'] = \
kwargs.get('code')
data['data']['attributes']['properties']['language'] = \
kwargs.get('language')

response = conn.post(url, data)
cls.pipeline_id = Pipelines.get_pipline_id(response)
cls.pipeline_name = pipeline_name
return response

@staticmethod
def start(pipeline_id):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_quest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_create_pipeline(self):
'--cluster-label', 'spark', '-c', 'print("hello")', '--language', 'python', '--user-arguments', 'users_argument']
print_command()
d1 = {"data": {"attributes": {"name": "test_pipeline_name", "status": "DRAFT", "create_type": 3},
"type": "pipelines"}}
"type": "pipeline"}}
response = {"relationships": {"nodes": [], "alerts": []}, "included": [],
"meta": {"command_details": {"code": "print(\"hello\")", "language": "python"},
"properties": {"checkpoint_location": None, "trigger_interval": None,
Expand Down

0 comments on commit fe5c4da

Please sign in to comment.