Skip to content

Commit

Permalink
Additional formatting with black
Browse files Browse the repository at this point in the history
  • Loading branch information
ummer-shell committed Jul 25, 2024
1 parent c4f482e commit a1448f7
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ def process_file(file_source_name_str: str, transformer_list=None) -> str:
sanitize_map['"'] = ""
PROCESS_REPLACE = "replace"
process_definitions: dict = dict()
process_definitions[
PROCESS_REPLACE
] = lambda source_str, to_be_replaced_str, to_replaced_with_str: source_str.replace(
to_be_replaced_str, to_replaced_with_str
process_definitions[PROCESS_REPLACE] = (
lambda source_str, to_be_replaced_str, to_replaced_with_str: source_str.replace(
to_be_replaced_str, to_replaced_with_str
)
)
sanitize_function = process_definitions[PROCESS_REPLACE]
####
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ def convert(self) -> PipelineJob:
for step in task["step_list"]:
step["component"] = getattr(sys.modules[__name__], step["component"])
for param_key, param_value in step["component_parameters"].items():
step["component_parameters"][
param_key
] = self._try_convert_to_pipeline_secret(param_value)
step["component_parameters"][param_key] = (
self._try_convert_to_pipeline_secret(param_value)
)
if not isinstance(
step["component_parameters"][param_key], PipelineSecret
) and isinstance(param_value, dict):
for key, value in param_value.items():
step["component_parameters"][param_key][
key
] = self._try_convert_to_pipeline_secret(value)
step["component_parameters"][param_key][key] = (
self._try_convert_to_pipeline_secret(value)
)

return PipelineJob(**pipeline_job_dict)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ def write_batch(self):
try:
if eventhub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
eventhub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
self.options[eventhub_connection_string] = (
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
)
)
df = self.prepare_columns()
return df.write.format("eventhubs").options(**self.options).save()
Expand All @@ -228,10 +228,10 @@ def write_stream(self):
)
if eventhub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
eventhub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
self.options[eventhub_connection_string] = (
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
)
)
df = self.prepare_columns()
df = self.data.select(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ def _configure_options(self, options: dict) -> dict:
connection_string = self._connection_string_builder(
self.connection_string_properties
)
options[
"kafka.sasl.jaas.config"
] = '{} required username="$ConnectionString" password="{}";'.format(
kafka_package, connection_string
options["kafka.sasl.jaas.config"] = (
'{} required username="$ConnectionString" password="{}";'.format(
kafka_package, connection_string
)
) # NOSONAR

if "kafka.request.timeout.ms" not in options:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,9 @@ def write_stream(self):

if self.destination_string != None:
if string_checkpoint_location is not None:
append_options[
"checkpointLocation"
] = string_checkpoint_location
append_options["checkpointLocation"] = (
string_checkpoint_location
)

delta_string = SparkDeltaDestination(
data=self.data.select(
Expand All @@ -407,9 +407,9 @@ def write_stream(self):

if self.destination_integer != None:
if integer_checkpoint_location is not None:
append_options[
"checkpointLocation"
] = integer_checkpoint_location
append_options["checkpointLocation"] = (
integer_checkpoint_location
)

delta_integer = SparkDeltaDestination(
data=self.data.select("TagName", "EventTime", "Status", "Value")
Expand Down
12 changes: 6 additions & 6 deletions src/sdk/python/rtdip_sdk/pipelines/execute/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ def _task_setup_dependency_injection(self, step_list: List[PipelineStep]):
# get secrets
for param_key, param_value in step.component_parameters.items():
if isinstance(param_value, PipelineSecret):
step.component_parameters[
param_key
] = self._get_secret_provider_attributes(param_value)().get()
step.component_parameters[param_key] = (
self._get_secret_provider_attributes(param_value)().get()
)
if isinstance(param_value, dict):
for key, value in param_value.items():
if isinstance(value, PipelineSecret):
step.component_parameters[param_key][
key
] = self._get_secret_provider_attributes(value)().get()
step.component_parameters[param_key][key] = (
self._get_secret_provider_attributes(value)().get()
)

provider.add_kwargs(**step.component_parameters)

Expand Down
16 changes: 8 additions & 8 deletions src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ def read_batch(self) -> DataFrame:
try:
if eventhub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
eventhub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
self.options[eventhub_connection_string] = (
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
)
)

return self.spark.read.format("eventhubs").options(**self.options).load()
Expand All @@ -177,10 +177,10 @@ def read_stream(self) -> DataFrame:
try:
if eventhub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
eventhub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
self.options[eventhub_connection_string] = (
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
)
)

return (
Expand Down
16 changes: 8 additions & 8 deletions src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ def read_batch(self) -> DataFrame:
try:
if iothub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
iothub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[iothub_connection_string]
self.options[iothub_connection_string] = (
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[iothub_connection_string]
)
)

return self.spark.read.format("eventhubs").options(**self.options).load()
Expand All @@ -177,10 +177,10 @@ def read_stream(self) -> DataFrame:
try:
if iothub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
iothub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[iothub_connection_string]
self.options[iothub_connection_string] = (
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[iothub_connection_string]
)
)

return (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,10 @@ def _configure_options(self, options: dict) -> dict:
connection_string = self._connection_string_builder(
self.connection_string_properties
)
options[
"kafka.sasl.jaas.config"
] = '{} required username="$ConnectionString" password="{}";'.format(
kafka_package, connection_string
options["kafka.sasl.jaas.config"] = (
'{} required username="$ConnectionString" password="{}";'.format(
kafka_package, connection_string
)
) # NOSONAR

if "kafka.request.timeout.ms" not in options:
Expand Down

0 comments on commit a1448f7

Please sign in to comment.