Skip to content

Commit

Permalink
adding changes in request format of sink_s3
Browse files Browse the repository at this point in the history
  • Loading branch information
harshits committed Jan 2, 2020
1 parent 1f91a58 commit 20d9d7c
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions qds_sdk/quest.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ def add_source(pipeline_id, schema, format, data_store,
raise ParseError("Please add only one valid source out of [kafka, s3, kinesis]")

@staticmethod
def add_sink(pipeline_id, format, data_store,
def add_sink(pipeline_id, data_format, data_store,
kafka_bootstrap_server=None,
topic=None,
other_kafka_settings=None,
Expand Down Expand Up @@ -593,15 +593,15 @@ def add_sink(pipeline_id, format, data_store,
"""
url = Quest.rest_entity_path + "/" + pipeline_id + "/node"
if data_store == "kafka":
return QuestAssisted._sink_kafka(url, format, kafka_bootstrap_server, topic,
return QuestAssisted._sink_kafka(url, data_format, kafka_bootstrap_server, topic,
other_kafka_settings=other_kafka_settings)
if data_store == "s3":
return QuestAssisted._sink_s3(url, format, sink_path, partition_by,
return QuestAssisted._sink_s3(url, data_format, sink_path, partition_by,
other_configurations=other_s3_configurations)
if data_store == "snowflake":
return QuestAssisted._sink_snowflake(url, format)
return QuestAssisted._sink_snowflake(url, data_format)
if data_store == "google_storage":
QuestAssisted._sink_google_storage(url, format, sink_path, partition_by,
QuestAssisted._sink_google_storage(url, data_format, sink_path, partition_by,
other_configurations=other_gs_configurations)
if data_store == "hive":
QuestAssisted._sink_hive(url, table_name, databases=hive_database,
Expand Down Expand Up @@ -1003,7 +1003,7 @@ def _sink_kafka(url, data_format, kafka_bootstrap_server, topic, other_kafka_set
return conn.put(url, data)

@staticmethod
def _sink_s3(url, data_format, path, partition, other_configurations=None):
def _sink_s3(url, data_format, path, partition_by, other_configurations=None):
"""
:param url:
Expand All @@ -1014,10 +1014,18 @@ def _sink_s3(url, data_format, path, partition, other_configurations=None):
:return:
"""
conn = Qubole.agent()
data = {"data": {"attributes": {
"fields": {"path": path, "partition_by": partition,
"other_configurations": other_configurations, "format": data_format},
"data_store": "s3"}, "type": "sink"}}
if other_configurations is None:
other_configurations = {}
if partition_by is None:
partition_by = ""
data = {"data":
{"attributes": {
"fields": {"path": path, "partition_by": partition_by,
"other_configurations": other_configurations},
"format": data_format,
"data_store": "s3"},
"type": "sink"}
}
return conn.put(url, data)

@staticmethod
Expand Down

0 comments on commit 20d9d7c

Please sign in to comment.