Skip to content

Commit

Permalink
added pipeline_id as attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
harshits committed Jan 3, 2020
1 parent fc52bf9 commit 9dcb367
Showing 1 changed file with 7 additions and 5 deletions.
12 changes: 7 additions & 5 deletions qds_sdk/quest.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ class QuestAssisted(Quest):
create_type = 1

@staticmethod
def add_source(pipeline_id, schema, format, data_store,
def add_source(pipeline_id, schema, format, data_store, topics=None,
endpoint_url=None,
stream_name=None,
starting_position=None,
Expand All @@ -530,6 +530,7 @@ def add_source(pipeline_id, schema, format, data_store,
gs_other_settings=None):
"""
:param topics:
:param pipeline_id:
:param schema:
:param format:
Expand All @@ -556,7 +557,8 @@ def add_source(pipeline_id, schema, format, data_store,
starting_position=starting_position,
other_kinesis_settings=other_kinesis_settings)
if data_store == "kafka":
return QuestAssisted._source_kafka(url, schema, format, broker, broker,
return QuestAssisted._source_kafka(url, schema, format, broker,
topics=topics,
topic_type=topic_type,
use_registry=use_registry,
registry_subject=registry_subject,
Expand Down Expand Up @@ -619,7 +621,7 @@ def add_sink(pipeline_id, data_format, data_store,

@staticmethod
def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format, source_data_store, sink_data_store, checkpoint_location,
cluster_label, output_mode,
cluster_label, output_mode, topics=None,
trigger_interval=None,
can_retry=True,
command_line_options=None,
Expand All @@ -639,7 +641,6 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
s3_other_settings=None,
gs_other_settings=None,
kafka_bootstrap_server=None,
topic=None,
other_kafka_settings=None,
sink_path=None,
partition_by=None,
Expand Down Expand Up @@ -709,6 +710,7 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
pipeline_id = Quest.get_pipline_id(response)
pipeline_id = str(pipeline_id)
src_response = QuestAssisted.add_source(pipeline_id, schema, source_data_format, source_data_store,
topics=topics,
endpoint_url=endpoint_url,
stream_name=stream_name,
starting_position=starting_position,
Expand All @@ -726,7 +728,7 @@ def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format,
log.info(src_response)
sink_reponse = QuestAssisted.add_sink(pipeline_id, sink_data_format, sink_data_store,
kafka_bootstrap_server=kafka_bootstrap_server,
topic=topic,
topic=topics,
other_kafka_settings=other_kafka_settings,
sink_path=sink_path,
partition_by=partition_by,
Expand Down

0 comments on commit 9dcb367

Please sign in to comment.