From 1f91a58b583dde998760d6d9e65f7c1faea267c9 Mon Sep 17 00:00:00 2001 From: harshits Date: Thu, 2 Jan 2020 14:14:21 +0530 Subject: [PATCH] adding changes --- qds_sdk/quest.py | 52 +++++++++++++++++++++++++++--------------------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/qds_sdk/quest.py b/qds_sdk/quest.py index c1ce32db..4bfba693 100644 --- a/qds_sdk/quest.py +++ b/qds_sdk/quest.py @@ -609,7 +609,7 @@ def add_sink(pipeline_id, format, data_store, raise ParseError("Please add only one valid sink out of [kafka, s3, snowflake, hive, google_storage]") @staticmethod - def create_pipeline(pipeline_name, schema, format, source_data_store, sink_data_store, checkpoint_location, + def create_pipeline(pipeline_name, schema, source_data_format, sink_data_format, source_data_store, sink_data_store, checkpoint_location, cluster_label, output_mode, trigger_interval=None, can_retry=True, @@ -698,7 +698,7 @@ def create_pipeline(pipeline_name, schema, format, source_data_store, sink_data_ log.info(response) pipeline_id = Quest.get_pipline_id(response) pipeline_id = str(pipeline_id) - src_response = QuestAssisted.add_source(pipeline_id, schema, format, source_data_store, + src_response = QuestAssisted.add_source(pipeline_id, schema, source_data_format, source_data_store, endpoint_url=endpoint_url, stream_name=stream_name, starting_position=starting_position, @@ -714,7 +714,7 @@ def create_pipeline(pipeline_name, schema, format, source_data_store, sink_data_ s3_other_settings=s3_other_settings, gs_other_settings=gs_other_settings) log.info(src_response) - sink_reponse = QuestAssisted.add_sink(pipeline_id, format, sink_data_store, + sink_reponse = QuestAssisted.add_sink(pipeline_id, sink_data_format, sink_data_store, kafka_bootstrap_server=kafka_bootstrap_server, topic=topic, other_kafka_settings=other_kafka_settings, @@ -843,14 +843,14 @@ def _window_group_operator(url, column_name, sliding_window_value_frequency, win return conn.put(url, data) @staticmethod - def _source_kafka(url, schema, format, broker, topics, topic_type="multiple", use_registry="write", + def _source_kafka(url, schema, data_format, broker, topics, topic_type="multiple", use_registry="write", registry_subject=None, registry_id=None, starting_offsets="latest", other_kafka_consumer_settings=None): """ :param url: :param schema: - :param format: + :param data_format: :param broker: :param topics: :param topic_type: @@ -877,7 +877,7 @@ def _source_kafka(url, schema, format, broker, topics, topic_type="multiple", us "registry_subject": registry_subject, "registry_id": registry_id, "starting_offsets": starting_offsets, - "format": format, + "format": data_format, "other_kafka_consumer_settings": other_kafka_consumer_settings }, "data_store": "kafka" @@ -891,7 +891,7 @@ def _source_kafka(url, schema, format, broker, topics, topic_type="multiple", us return response @staticmethod - def _source_kinesis(url, schema, format, endpoint_url, stream_name, starting_position="latest", + def _source_kinesis(url, schema, data_format, endpoint_url, stream_name, starting_position="latest", other_kinesis_settings=None): """ @@ -917,7 +917,7 @@ def _source_kinesis(url, schema, format, endpoint_url, stream_name, starting_pos "stream_name": stream_name, "schema": schema, "starting_position": starting_position, - "format": format, + "format": data_format, "other_kinesis_settings": other_kinesis_settings }, "data_store": "kinesis" @@ -928,11 +928,11 @@ def _source_kinesis(url, schema, format, endpoint_url, stream_name, starting_pos return conn.put(url, data) @staticmethod - def _source_s3(url, schema, format, path, other_settings=None): + def _source_s3(url, schema, data_format, path, other_settings=None): """ :param url: API url with pipeline id :param schema: - :param format: + :param data_format: :param other_settings: {"fileNameOnly": "false", "latestFirst": "false"} :return: """ @@ -945,7 +945,7 @@ def _source_s3(url, schema, format, path, other_settings=None): "fields": { "path": path, "schema": schema, - "format": format, + "format": data_format, "other_settings": other_settings }, "data_store": "s3" @@ -956,11 +956,11 @@ def _source_s3(url, schema, format, path, other_settings=None): return conn.put(url, data) @staticmethod - def _source_google_storage(url, schema, format, source_path, other_settings=None): + def _source_google_storage(url, schema, data_format, source_path, other_settings=None): """ :param url: API url with pipeline id :param schema: - :param format: + :param data_format: :param other_settings: :return: """ @@ -971,7 +971,7 @@ def _source_google_storage(url, schema, format, source_path, other_settings=None {"attributes": {"fields": {"path": source_path, - "format": format, + "format": data_format, "schema": schema, "other_settings": other_settings }, @@ -982,11 +982,11 @@ def _source_google_storage(url, schema, format, source_path, other_settings=None return conn.put(url, data) @staticmethod - def _sink_kafka(url, format, kafka_bootstrap_server, topic, other_kafka_settings=None): + def _sink_kafka(url, data_format, kafka_bootstrap_server, topic, other_kafka_settings=None): """ :param url: - :param format: + :param data_format: :param kafka_bootstrap_server: :param topic: :param other_kafka_settings: @@ -997,17 +997,17 @@ def _sink_kafka(url, format, kafka_bootstrap_server, topic, other_kafka_settings other_kafka_settings = {"kafka.max.block.ms": 60000} data = {"data": {"attributes": { "fields": {"kafka_bootstrap_server": kafka_bootstrap_server, "topic": topic, - "format": format, "other_kafka_settings": other_kafka_settings}, + "format": data_format, "other_kafka_settings": other_kafka_settings}, "data_store": "kafka"}, "type": "sink"}} return conn.put(url, data) @staticmethod - def _sink_s3(url, format, path, partition, other_configurations=None): + def _sink_s3(url, data_format, path, partition, other_configurations=None): """ :param url: - :param format: + :param data_format: :param path: :param partition: :param other_configurations: @@ -1016,8 +1016,8 @@ def _sink_s3(url, format, path, partition, other_configurations=None): conn = Qubole.agent() data = {"data": {"attributes": { "fields": {"path": path, "partition_by": partition, - "other_configurations": other_configurations, "format": format}, - "data_store": "s3"}, "type": "sink"}} + "other_configurations": other_configurations, "format": data_format}, + "data_store": "s3"}, "type": "sink"}} return conn.put(url, data) @staticmethod @@ -1072,9 +1072,15 @@ def _sink_google_storage(url, format, sink_path, partition_by, other_configurati return conn.put(url, data) @staticmethod - def add_registry(registry_name, host, port=8081, registry_type='schema', use_gateway=False, gateway_ip=None, + def add_registry(registry_name, host, + port=8081, + registry_type='schema', + use_gateway=False, + gateway_ip=None, gateway_port=None, - gateway_username=None, gateway_private_key=None, **kwargs): + gateway_username=None, + gateway_private_key=None, + **kwargs): """ :param registry_name: Name of Registry :param registry_type: