Skip to content

Commit

Permalink
adding quest-test changes
Browse files Browse the repository at this point in the history
  • Loading branch information
harshits committed Jan 14, 2020
1 parent 4dfbd67 commit 874888e
Showing 1 changed file with 52 additions and 25 deletions.
77 changes: 52 additions & 25 deletions qds_sdk/quest.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class Quest(Resource):
pipeline_name = None
pipeline_code = None
jar_path = None

@staticmethod
def get_pipline_id(response):
return str(response.get('data').get('id'))
Expand Down Expand Up @@ -278,7 +279,6 @@ def pause(pipeline_id):
conn = Qubole.agent()
return conn.put(url)


@staticmethod
def archive(pipeline_id):
"""
Expand Down Expand Up @@ -612,7 +612,7 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
trigger_interval=None,
can_retry=True,
command_line_options=None,
operator=None,
operators=None,
channel_id=None,
endpoint_url=None,
stream_name=None,
Expand Down Expand Up @@ -640,12 +640,13 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
value=None,
filter_column_name=None,
select_column_names=None,
frequency=None,
watermark_frequency=None,
sliding_window_value_frequency=None,
window_interval_frequency=None,
other_columns=None):
"""
:param watermark_frequency:
:param sink_topics:
:param source_topics:
:param sink_data_format:
Expand All @@ -662,7 +663,7 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
:param trigger_interval:
:param can_retry:
:param command_line_options:
:param operator:
:param operators: dict of operators
:param channel_id:
:param endpoint_url:
:param stream_name:
Expand Down Expand Up @@ -737,16 +738,32 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
command_line_options=command_line_options)
log.info(property_response)
final_response = property_response
if operator:
operator_response = QuestAssisted.add_operator(pipeline_id, operator,
condition=condition,
value=value,
filter_column_name=filter_column_name,
select_column_names=select_column_names,
sliding_window_value_frequency=sliding_window_value_frequency,
window_interval_frequency=window_interval_frequency,
other_columns=other_columns)
if operators:
for operator, value in operators.items():
if operator is "filter":
operator_response = QuestAssisted.add_operator(pipeline_id, operator,
condition=value["condition"],
value=value["value"],
filter_column_name=value["column_name"])
elif operator is "select":
operator_response = QuestAssisted.add_operator(pipeline_id, operator,
select_column_names=value["column_names"])
elif operator is "watermark":
operator_response = QuestAssisted.add_operator(pipeline_id, operator,
watermark_column_name=value["column_names"],
watermark_frequency=value["frequency"])

elif operator is "windowed_group":
operator_response = QuestAssisted.add_operator(pipeline_id, operator,
groupby_column_name=value["column_name"],
sliding_window_value=value["sliding_window_value"],
window_interval_frequency=value["window_interval_frequency"],
other_columns=value["other_columns"])
else:
raise ParseError("Please enter valid operator value. Valid values are [filter, select, watermark, windowed_group]")

log.info(operator_response)

final_response = operator_response
if channel_id:
response = QuestAssisted.set_alert(pipeline_id, channel_id)
Expand All @@ -761,19 +778,24 @@ def add_operator(pipeline_id, operator,
value=None,
filter_column_name=None,
select_column_names=None,
sliding_window_value_frequency=None,
sliding_window_value=None,
window_interval_frequency=None,
other_columns=None):
other_columns=None,
watermark_frequency=None,
groupby_column_name=None,
watermark_column_name=None):
"""
:param watermark_column_name:
:param groupby_column_name:
:param watermark_frequency:
:param select_column_names:
:param filter_column_name:
:param pipeline_id:
:param operator:
:param condition:
:param value:
:param frequency:
:param sliding_window_value_frequency:
:param sliding_window_value:
:param window_interval_frequency:
:param other_columns:
:return:
Expand All @@ -786,9 +808,9 @@ def add_operator(pipeline_id, operator,
if operator == "select":
return QuestAssisted._select_operator(url, select_column_names)
if operator == "watermark":
return QuestAssisted._watermark_operator(url, filter_column_name, frequency)
return QuestAssisted._watermark_operator(url, watermark_column_name, watermark_frequency)
if operator == "window_group":
return QuestAssisted._window_group_operator(url, filter_column_name, sliding_window_value_frequency,
return QuestAssisted._window_group_operator(url, groupby_column_name, sliding_window_value,
window_interval_frequency, other_columns)
raise ParseError("Please add only one valid sink out of [kafka, s3, snowflake, hive, google_storage]")

Expand Down Expand Up @@ -831,7 +853,7 @@ def _watermark_operator(url, column_name, frequency):
return conn.put(url, data)

@staticmethod
def _window_group_operator(url, column_name, sliding_window_value_frequency, window_interval_frequency,
def _window_group_operator(url, column_name, sliding_window_value, window_interval_frequency,
other_columns):
"""
:param url: API url with pipeline id
Expand All @@ -846,11 +868,11 @@ def _window_group_operator(url, column_name, sliding_window_value_frequency, win
"window_expression":
{"column_name": column_name,
"sliding_window_value": {
"frequency": sliding_window_value_frequency,
"frequency": sliding_window_value,
"unit": "minute"},
"window_interval": {
"frequency": window_interval_frequency,
"unit": "minute"}},
"window_interval": {
"frequency": window_interval_frequency,
"unit": "minute"}},
"other_columns": other_columns,
"action": "count"}}}
return conn.put(url, data)
Expand Down Expand Up @@ -914,7 +936,7 @@ def _source_kinesis(url, schema, data_format, endpoint_url, stream_name, startin
"""
:param url:
:param schema:
:param format:
:param data_format:
:param endpoint_url:
:param stream_name:
:param starting_position:
Expand Down Expand Up @@ -1127,6 +1149,11 @@ def add_registry(registry_name, host,

@staticmethod
def switch_from_assisted(pipeline_id):
"""
Switch pipeline from assisted BYOC or BYOJ
:param pipeline_id:
:return:
"""
conn = Qubole.agent()
url = QuestAssisted.rest_entity_path + '/' + pipeline_id
response = conn.get(url)
Expand Down

0 comments on commit 874888e

Please sign in to comment.