diff --git a/src/sdk/python/rtdip_sdk/data_models/meters/utils/transform.py b/src/sdk/python/rtdip_sdk/data_models/meters/utils/transform.py index 1fbaabc2a..1f86956a4 100644 --- a/src/sdk/python/rtdip_sdk/data_models/meters/utils/transform.py +++ b/src/sdk/python/rtdip_sdk/data_models/meters/utils/transform.py @@ -48,10 +48,10 @@ def process_file(file_source_name_str: str, transformer_list=None) -> str: sanitize_map['"'] = "" PROCESS_REPLACE = "replace" process_definitions: dict = dict() - process_definitions[ - PROCESS_REPLACE - ] = lambda source_str, to_be_replaced_str, to_replaced_with_str: source_str.replace( - to_be_replaced_str, to_replaced_with_str + process_definitions[PROCESS_REPLACE] = ( + lambda source_str, to_be_replaced_str, to_replaced_with_str: source_str.replace( + to_be_replaced_str, to_replaced_with_str + ) ) sanitize_function = process_definitions[PROCESS_REPLACE] #### diff --git a/src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py b/src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py index 218261742..7499ac38f 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py +++ b/src/sdk/python/rtdip_sdk/pipelines/converters/pipeline_job_json.py @@ -71,16 +71,16 @@ def convert(self) -> PipelineJob: for step in task["step_list"]: step["component"] = getattr(sys.modules[__name__], step["component"]) for param_key, param_value in step["component_parameters"].items(): - step["component_parameters"][ - param_key - ] = self._try_convert_to_pipeline_secret(param_value) + step["component_parameters"][param_key] = ( + self._try_convert_to_pipeline_secret(param_value) + ) if not isinstance( step["component_parameters"][param_key], PipelineSecret ) and isinstance(param_value, dict): for key, value in param_value.items(): - step["component_parameters"][param_key][ - key - ] = self._try_convert_to_pipeline_secret(value) + step["component_parameters"][param_key][key] = ( + self._try_convert_to_pipeline_secret(value) + ) return PipelineJob(**pipeline_job_dict) diff --git a/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/eventhub.py b/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/eventhub.py index 2062aa28d..6f4da9aac 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/eventhub.py +++ b/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/eventhub.py @@ -200,10 +200,10 @@ def write_batch(self): try: if eventhub_connection_string in self.options: sc = self.spark.sparkContext - self.options[ - eventhub_connection_string - ] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt( - self.options[eventhub_connection_string] + self.options[eventhub_connection_string] = ( + sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt( + self.options[eventhub_connection_string] + ) ) df = self.prepare_columns() return df.write.format("eventhubs").options(**self.options).save() @@ -228,10 +228,10 @@ def write_stream(self): ) if eventhub_connection_string in self.options: sc = self.spark.sparkContext - self.options[ - eventhub_connection_string - ] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt( - self.options[eventhub_connection_string] + self.options[eventhub_connection_string] = ( + sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt( + self.options[eventhub_connection_string] + ) ) df = self.prepare_columns() df = self.data.select( diff --git a/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka_eventhub.py b/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka_eventhub.py index aa801db90..d7d711656 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka_eventhub.py +++ b/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka_eventhub.py @@ -238,10 +238,10 @@ def _configure_options(self, options: dict) -> dict: connection_string = self._connection_string_builder( self.connection_string_properties ) - options[ - "kafka.sasl.jaas.config" - ] = '{} required username="$ConnectionString" password="{}";'.format( - kafka_package, connection_string + options["kafka.sasl.jaas.config"] = ( + '{} required username="$ConnectionString" password="{}";'.format( + kafka_package, connection_string + ) ) # NOSONAR if "kafka.request.timeout.ms" not in options: diff --git a/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_to_delta.py b/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_to_delta.py index f819512f6..97ceec5ff 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_to_delta.py +++ b/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_to_delta.py @@ -388,9 +388,9 @@ def write_stream(self): if self.destination_string != None: if string_checkpoint_location is not None: - append_options[ - "checkpointLocation" - ] = string_checkpoint_location + append_options["checkpointLocation"] = ( + string_checkpoint_location + ) delta_string = SparkDeltaDestination( data=self.data.select( @@ -407,9 +407,9 @@ def write_stream(self): if self.destination_integer != None: if integer_checkpoint_location is not None: - append_options[ - "checkpointLocation" - ] = integer_checkpoint_location + append_options["checkpointLocation"] = ( + integer_checkpoint_location + ) delta_integer = SparkDeltaDestination( data=self.data.select("TagName", "EventTime", "Status", "Value") diff --git a/src/sdk/python/rtdip_sdk/pipelines/execute/job.py b/src/sdk/python/rtdip_sdk/pipelines/execute/job.py index 7511ed3eb..4bab4a1fc 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/execute/job.py +++ b/src/sdk/python/rtdip_sdk/pipelines/execute/job.py @@ -141,15 +141,15 @@ def _task_setup_dependency_injection(self, step_list: List[PipelineStep]): # get secrets for param_key, param_value in step.component_parameters.items(): if isinstance(param_value, PipelineSecret): - step.component_parameters[ - param_key - ] = self._get_secret_provider_attributes(param_value)().get() + step.component_parameters[param_key] = ( + self._get_secret_provider_attributes(param_value)().get() + ) if isinstance(param_value, dict): for key, value in param_value.items(): if isinstance(value, PipelineSecret): - step.component_parameters[param_key][ - key - ] = self._get_secret_provider_attributes(value)().get() + step.component_parameters[param_key][key] = ( + self._get_secret_provider_attributes(value)().get() + ) provider.add_kwargs(**step.component_parameters) diff --git a/src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py b/src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py index 5b7f31ed3..e66d027de 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py +++ b/src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py @@ -154,10 +154,10 @@ def read_batch(self) -> DataFrame: try: if eventhub_connection_string in self.options: sc = self.spark.sparkContext - self.options[ - eventhub_connection_string - ] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt( - self.options[eventhub_connection_string] + self.options[eventhub_connection_string] = ( + sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt( + self.options[eventhub_connection_string] + ) ) return self.spark.read.format("eventhubs").options(**self.options).load() @@ -177,10 +177,10 @@ def read_stream(self) -> DataFrame: try: if eventhub_connection_string in self.options: sc = self.spark.sparkContext - self.options[ - eventhub_connection_string - ] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt( - self.options[eventhub_connection_string] + self.options[eventhub_connection_string] = ( + sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt( + self.options[eventhub_connection_string] + ) ) return ( diff --git a/src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py b/src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py index c883e0e38..2ebf52362 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py +++ b/src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py @@ -154,10 +154,10 @@ def read_batch(self) -> DataFrame: try: if iothub_connection_string in self.options: sc = self.spark.sparkContext - self.options[ - iothub_connection_string - ] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt( - self.options[iothub_connection_string] + self.options[iothub_connection_string] = ( + sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt( + self.options[iothub_connection_string] + ) ) return self.spark.read.format("eventhubs").options(**self.options).load() @@ -177,10 +177,10 @@ def read_stream(self) -> DataFrame: try: if iothub_connection_string in self.options: sc = self.spark.sparkContext - self.options[ - iothub_connection_string - ] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt( - self.options[iothub_connection_string] + self.options[iothub_connection_string] = ( + sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt( + self.options[iothub_connection_string] + ) ) return ( diff --git a/src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py b/src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py index 2dcb1e9d6..e551a827b 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py +++ b/src/sdk/python/rtdip_sdk/pipelines/sources/spark/kafka_eventhub.py @@ -301,10 +301,10 @@ def _configure_options(self, options: dict) -> dict: connection_string = self._connection_string_builder( self.connection_string_properties ) - options[ - "kafka.sasl.jaas.config" - ] = '{} required username="$ConnectionString" password="{}";'.format( - kafka_package, connection_string + options["kafka.sasl.jaas.config"] = ( + '{} required username="$ConnectionString" password="{}";'.format( + kafka_package, connection_string + ) ) # NOSONAR if "kafka.request.timeout.ms" not in options: