Skip to content

Commit

Permalink
adding changes
Browse files Browse the repository at this point in the history
  • Loading branch information
harshits committed Jan 2, 2020
1 parent fd4fbb4 commit 1f91a58
Showing 1 changed file with 29 additions and 23 deletions.
52 changes: 29 additions & 23 deletions qds_sdk/quest.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ def add_sink(pipeline_id, format, data_store,
raise ParseError("Please add only one valid sink out of [kafka, s3, snowflake, hive, google_storage]")

@staticmethod
def create_pipeline(pipeline_name, schema, format, source_data_store, sink_data_store, checkpoint_location,
def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format, source_data_store, sink_data_store, checkpoint_location,
cluster_label, output_mode,
trigger_interval=None,
can_retry=True,
Expand Down Expand Up @@ -698,7 +698,7 @@ def create_pipeline(pipeline_name, schema, format, source_data_store, sink_data_
log.info(response)
pipeline_id = Quest.get_pipline_id(response)
pipeline_id = str(pipeline_id)
src_response = QuestAssisted.add_source(pipeline_id, schema, format, source_data_store,
src_response = QuestAssisted.add_source(pipeline_id, schema, source_data_format, source_data_store,
endpoint_url=endpoint_url,
stream_name=stream_name,
starting_position=starting_position,
Expand All @@ -714,7 +714,7 @@ def create_pipeline(pipeline_name, schema, format, source_data_store, sink_data_
s3_other_settings=s3_other_settings,
gs_other_settings=gs_other_settings)
log.info(src_response)
sink_reponse = QuestAssisted.add_sink(pipeline_id, format, sink_data_store,
sink_reponse = QuestAssisted.add_sink(pipeline_id, sink_data_format, sink_data_store,
kafka_bootstrap_server=kafka_bootstrap_server,
topic=topic,
other_kafka_settings=other_kafka_settings,
Expand Down Expand Up @@ -843,14 +843,14 @@ def _window_group_operator(url, column_name, sliding_window_value_frequency, win
return conn.put(url, data)

@staticmethod
def _source_kafka(url, schema, format, broker, topics, topic_type="multiple", use_registry="write",
def _source_kafka(url, schema, data_format, broker, topics, topic_type="multiple", use_registry="write",
registry_subject=None, registry_id=None, starting_offsets="latest",
other_kafka_consumer_settings=None):
"""
:param url:
:param schema:
:param format:
:param data_format:
:param broker:
:param topics:
:param topic_type:
Expand All @@ -877,7 +877,7 @@ def _source_kafka(url, schema, format, broker, topics, topic_type="multiple", us
"registry_subject": registry_subject,
"registry_id": registry_id,
"starting_offsets": starting_offsets,
"format": format,
"format": data_format,
"other_kafka_consumer_settings": other_kafka_consumer_settings
},
"data_store": "kafka"
Expand All @@ -891,7 +891,7 @@ def _source_kafka(url, schema, format, broker, topics, topic_type="multiple", us
return response

@staticmethod
def _source_kinesis(url, schema, format, endpoint_url, stream_name, starting_position="latest",
def _source_kinesis(url, schema, data_format, endpoint_url, stream_name, starting_position="latest",
other_kinesis_settings=None):
"""
Expand All @@ -917,7 +917,7 @@ def _source_kinesis(url, schema, format, endpoint_url, stream_name, starting_pos
"stream_name": stream_name,
"schema": schema,
"starting_position": starting_position,
"format": format,
"format": data_format,
"other_kinesis_settings": other_kinesis_settings
},
"data_store": "kinesis"
Expand All @@ -928,11 +928,11 @@ def _source_kinesis(url, schema, format, endpoint_url, stream_name, starting_pos
return conn.put(url, data)

@staticmethod
def _source_s3(url, schema, format, path, other_settings=None):
def _source_s3(url, schema, data_format, path, other_settings=None):
"""
:param url: API url with pipeline id
:param schema:
:param format:
:param data_format:
:param other_settings: {"fileNameOnly": "false", "latestFirst": "false"}
:return:
"""
Expand All @@ -945,7 +945,7 @@ def _source_s3(url, schema, format, path, other_settings=None):
"fields": {
"path": path,
"schema": schema,
"format": format,
"format": data_format,
"other_settings": other_settings
},
"data_store": "s3"
Expand All @@ -956,11 +956,11 @@ def _source_s3(url, schema, format, path, other_settings=None):
return conn.put(url, data)

@staticmethod
def _source_google_storage(url, schema, format, source_path, other_settings=None):
def _source_google_storage(url, schema, data_format, source_path, other_settings=None):
"""
:param url: API url with pipeline id
:param schema:
:param format:
:param data_format:
:param other_settings:
:return:
"""
Expand All @@ -971,7 +971,7 @@ def _source_google_storage(url, schema, format, source_path, other_settings=None
{"attributes":
{"fields":
{"path": source_path,
"format": format,
"format": data_format,
"schema": schema,
"other_settings": other_settings
},
Expand All @@ -982,11 +982,11 @@ def _source_google_storage(url, schema, format, source_path, other_settings=None
return conn.put(url, data)

@staticmethod
def _sink_kafka(url, format, kafka_bootstrap_server, topic, other_kafka_settings=None):
def _sink_kafka(url, data_format, kafka_bootstrap_server, topic, other_kafka_settings=None):
"""
:param url:
:param format:
:param data_format:
:param kafka_bootstrap_server:
:param topic:
:param other_kafka_settings:
Expand All @@ -997,17 +997,17 @@ def _sink_kafka(url, format, kafka_bootstrap_server, topic, other_kafka_settings
other_kafka_settings = {"kafka.max.block.ms": 60000}
data = {"data": {"attributes": {
"fields": {"kafka_bootstrap_server": kafka_bootstrap_server, "topic": topic,
"format": format, "other_kafka_settings": other_kafka_settings},
"format": data_format, "other_kafka_settings": other_kafka_settings},
"data_store": "kafka"},
"type": "sink"}}
return conn.put(url, data)

@staticmethod
def _sink_s3(url, format, path, partition, other_configurations=None):
def _sink_s3(url, data_format, path, partition, other_configurations=None):
"""
:param url:
:param format:
:param data_format:
:param path:
:param partition:
:param other_configurations:
Expand All @@ -1016,8 +1016,8 @@ def _sink_s3(url, format, path, partition, other_configurations=None):
conn = Qubole.agent()
data = {"data": {"attributes": {
"fields": {"path": path, "partition_by": partition,
"other_configurations": other_configurations, "format": format},
"data_store": "s3"}, "type": "sink"}}
"other_configurations": other_configurations, "format": data_format},
"data_store": "s3"}, "type": "sink"}}
return conn.put(url, data)

@staticmethod
Expand Down Expand Up @@ -1072,9 +1072,15 @@ def _sink_google_storage(url, format, sink_path, partition_by, other_configurati
return conn.put(url, data)

@staticmethod
def add_registry(registry_name, host, port=8081, registry_type='schema', use_gateway=False, gateway_ip=None,
def add_registry(registry_name, host,
port=8081,
registry_type='schema',
use_gateway=False,
gateway_ip=None,
gateway_port=None,
gateway_username=None, gateway_private_key=None, **kwargs):
gateway_username=None,
gateway_private_key=None,
**kwargs):
"""
:param registry_name: Name of Registry
:param registry_type:
Expand Down

0 comments on commit 1f91a58

Please sign in to comment.