Skip to content

Commit

Permalink
Merge pull request #649 from rtdip/develop
Browse files Browse the repository at this point in the history
v0.9.10
  • Loading branch information
cching95 authored Feb 1, 2024
2 parents bb00383 + 145eee8 commit 052b821
Show file tree
Hide file tree
Showing 21 changed files with 272 additions and 158 deletions.
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ dependencies:
- xarray>=2023.1.0,<2023.8.0
- ecmwf-api-client==1.6.3
- netCDF4==1.6.4
- black==23.7.0
- black==24.1.0
- pip:
- dependency-injector==4.41.0
- azure-functions==1.15.0
Expand Down
28 changes: 16 additions & 12 deletions src/api/v1/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,22 +154,26 @@ class BaseHeaders:
def __init__(
self,
x_databricks_server_hostname: str = Header(
default=...
if os.getenv("DATABRICKS_SQL_SERVER_HOSTNAME") is None
else os.getenv("DATABRICKS_SQL_SERVER_HOSTNAME"),
default=(
...
if os.getenv("DATABRICKS_SQL_SERVER_HOSTNAME") is None
else os.getenv("DATABRICKS_SQL_SERVER_HOSTNAME")
),
description="Databricks SQL Server Hostname",
include_in_schema=True
if os.getenv("DATABRICKS_SQL_SERVER_HOSTNAME") is None
else False,
include_in_schema=(
True if os.getenv("DATABRICKS_SQL_SERVER_HOSTNAME") is None else False
),
),
x_databricks_http_path: str = Header(
default=...
if os.getenv("DATABRICKS_SQL_HTTP_PATH") is None
else os.getenv("DATABRICKS_SQL_HTTP_PATH"),
default=(
...
if os.getenv("DATABRICKS_SQL_HTTP_PATH") is None
else os.getenv("DATABRICKS_SQL_HTTP_PATH")
),
description="Databricks SQL HTTP Path",
include_in_schema=True
if os.getenv("DATABRICKS_SQL_HTTP_PATH") is None
else False,
include_in_schema=(
True if os.getenv("DATABRICKS_SQL_HTTP_PATH") is None else False
),
),
):
self.x_databricks_server_hostname = x_databricks_server_hostname
Expand Down
12 changes: 6 additions & 6 deletions src/sdk/python/rtdip_sdk/connectors/grpc/spark_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ def __init__(

if spark is None:
self.connection = SparkClient(
spark_configuration={}
if spark_configuration is None
else spark_configuration,
spark_libraries=Libraries()
if spark_libraries is None
else spark_libraries,
spark_configuration=(
{} if spark_configuration is None else spark_configuration
),
spark_libraries=(
Libraries() if spark_libraries is None else spark_libraries
),
spark_remote=spark_remote,
).spark_session
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ def process_file(file_source_name_str: str, transformer_list=None) -> str:
sanitize_map['"'] = ""
PROCESS_REPLACE = "replace"
process_definitions: dict = dict()
process_definitions[
PROCESS_REPLACE
] = lambda source_str, to_be_replaced_str, to_replaced_with_str: source_str.replace(
to_be_replaced_str, to_replaced_with_str
process_definitions[PROCESS_REPLACE] = (
lambda source_str, to_be_replaced_str, to_replaced_with_str: source_str.replace(
to_be_replaced_str, to_replaced_with_str
)
)
sanitize_function = process_definitions[PROCESS_REPLACE]
####
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ def convert(self) -> PipelineJob:
for step in task["step_list"]:
step["component"] = getattr(sys.modules[__name__], step["component"])
for param_key, param_value in step["component_parameters"].items():
step["component_parameters"][
param_key
] = self._try_convert_to_pipeline_secret(param_value)
step["component_parameters"][param_key] = (
self._try_convert_to_pipeline_secret(param_value)
)
if not isinstance(
step["component_parameters"][param_key], PipelineSecret
) and isinstance(param_value, dict):
for key, value in param_value.items():
step["component_parameters"][param_key][
key
] = self._try_convert_to_pipeline_secret(value)
step["component_parameters"][param_key][key] = (
self._try_convert_to_pipeline_secret(value)
)

return PipelineJob(**pipeline_job_dict)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ def write_batch(self):
try:
if eventhub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
eventhub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
self.options[eventhub_connection_string] = (
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
)
)
df = self.prepare_columns()
return df.write.format("eventhubs").options(**self.options).save()
Expand All @@ -228,10 +228,10 @@ def write_stream(self):
)
if eventhub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
eventhub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
self.options[eventhub_connection_string] = (
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
)
)
df = self.prepare_columns()
df = self.data.select(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,10 @@ def _configure_options(self, options: dict) -> dict:
connection_string = self._connection_string_builder(
self.connection_string_properties
)
options[
"kafka.sasl.jaas.config"
] = '{} required username="$ConnectionString" password="{}";'.format(
kafka_package, connection_string
options["kafka.sasl.jaas.config"] = (
'{} required username="$ConnectionString" password="{}";'.format(
kafka_package, connection_string
)
) # NOSONAR

if "kafka.request.timeout.ms" not in options:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,9 @@ def write_stream(self):

if self.destination_string != None:
if string_checkpoint_location is not None:
append_options[
"checkpointLocation"
] = string_checkpoint_location
append_options["checkpointLocation"] = (
string_checkpoint_location
)

delta_string = SparkDeltaDestination(
data=self.data.select(
Expand All @@ -407,9 +407,9 @@ def write_stream(self):

if self.destination_integer != None:
if integer_checkpoint_location is not None:
append_options[
"checkpointLocation"
] = integer_checkpoint_location
append_options["checkpointLocation"] = (
integer_checkpoint_location
)

delta_integer = SparkDeltaDestination(
data=self.data.select("TagName", "EventTime", "Status", "Value")
Expand Down
12 changes: 6 additions & 6 deletions src/sdk/python/rtdip_sdk/pipelines/execute/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ def _task_setup_dependency_injection(self, step_list: List[PipelineStep]):
# get secrets
for param_key, param_value in step.component_parameters.items():
if isinstance(param_value, PipelineSecret):
step.component_parameters[
param_key
] = self._get_secret_provider_attributes(param_value)().get()
step.component_parameters[param_key] = (
self._get_secret_provider_attributes(param_value)().get()
)
if isinstance(param_value, dict):
for key, value in param_value.items():
if isinstance(value, PipelineSecret):
step.component_parameters[param_key][
key
] = self._get_secret_provider_attributes(value)().get()
step.component_parameters[param_key][key] = (
self._get_secret_provider_attributes(value)().get()
)

provider.add_kwargs(**step.component_parameters)

Expand Down
16 changes: 8 additions & 8 deletions src/sdk/python/rtdip_sdk/pipelines/sources/spark/eventhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ def read_batch(self) -> DataFrame:
try:
if eventhub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
eventhub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
self.options[eventhub_connection_string] = (
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
)
)

return self.spark.read.format("eventhubs").options(**self.options).load()
Expand All @@ -177,10 +177,10 @@ def read_stream(self) -> DataFrame:
try:
if eventhub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
eventhub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
self.options[eventhub_connection_string] = (
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[eventhub_connection_string]
)
)

return (
Expand Down
16 changes: 8 additions & 8 deletions src/sdk/python/rtdip_sdk/pipelines/sources/spark/iot_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ def read_batch(self) -> DataFrame:
try:
if iothub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
iothub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[iothub_connection_string]
self.options[iothub_connection_string] = (
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[iothub_connection_string]
)
)

return self.spark.read.format("eventhubs").options(**self.options).load()
Expand All @@ -177,10 +177,10 @@ def read_stream(self) -> DataFrame:
try:
if iothub_connection_string in self.options:
sc = self.spark.sparkContext
self.options[
iothub_connection_string
] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[iothub_connection_string]
self.options[iothub_connection_string] = (
sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
self.options[iothub_connection_string]
)
)

return (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,10 @@ def _configure_options(self, options: dict) -> dict:
connection_string = self._connection_string_builder(
self.connection_string_properties
)
options[
"kafka.sasl.jaas.config"
] = '{} required username="$ConnectionString" password="{}";'.format(
kafka_package, connection_string
options["kafka.sasl.jaas.config"] = (
'{} required username="$ConnectionString" password="{}";'.format(
kafka_package, connection_string
)
) # NOSONAR

if "kafka.request.timeout.ms" not in options:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,19 @@ def _convert_binary_to_pandas(pdf):
}
)

value_type = str(table.schema.field("Value").type)
if value_type == "int16" or value_type == "int32":
value_type = "integer"

output_pdf = table.to_pandas()

if "ValueType" not in output_pdf.columns:
value_type = str(table.schema.field("Value").type)
if value_type == "int16" or value_type == "int32":
value_type = "integer"
output_pdf["ValueType"] = value_type

if "ChangeType" not in output_pdf.columns:
output_pdf["ChangeType"] = "insert"

output_pdf["EventDate"] = output_pdf["EventTime"].dt.date
output_pdf["Value"] = output_pdf["Value"].astype(str)
output_pdf["ChangeType"] = "insert"
output_pdf["ValueType"] = value_type
output_pdf = output_pdf[
[
"EventDate",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,9 @@ def _set_acl(
if group_object_id in acl:
acl_props_list.remove(acl)

acl_props_list.append(group_id_acl)
if set_as_default_acl == True:
acl_props_list.append("default:{}".format(group_id_acl))
else:
acl_props_list.append(group_id_acl)

new_acl_props = ",".join(acl_props_list)
acl_directory_client.set_access_control(acl=new_acl_props)
Expand Down
Loading

0 comments on commit 052b821

Please sign in to comment.