Skip to content

Commit

Permalink
Update Mirico Tranformer with an optional tag name field to be append…
Browse files Browse the repository at this point in the history
…ed to TagName column (#654)

* add optional tagname field

Signed-off-by: Chloe Ching <[email protected]>

* added optional tagname field to mirico transformer

Signed-off-by: Chloe Ching <[email protected]>

---------

Signed-off-by: Chloe Ching <[email protected]>
  • Loading branch information
cching95 authored Feb 5, 2024
1 parent bf14c96 commit 88fb840
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
map_values,
concat_ws,
to_timestamp,
upper,
when,
)
from ...._sdk_utils.compare_versions import (
_package_version_meets_minimum,
Expand All @@ -47,6 +49,7 @@ class MiricoJsonToPCDMTransformer(TransformerInterface):
source_column_name="body",
status_null_value="Good",
change_type_value="insert"
tagname_field="test"
)
result = mirico_json_to_pcdm_transformer.transform()
Expand All @@ -57,6 +60,7 @@ class MiricoJsonToPCDMTransformer(TransformerInterface):
source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data
status_null_value (optional str): If populated, will replace 'Good' in the Status column with the specified value.
change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value.
tagname_field (optional str): If populated, will add the specified field to the TagName column.
"""

data: DataFrame
Expand All @@ -70,12 +74,14 @@ def __init__(
source_column_name: str,
status_null_value: str = "Good",
change_type_value: str = "insert",
tagname_field: str = None,
) -> None:
_package_version_meets_minimum("pyspark", "3.4.0")
self.data = data
self.source_column_name = source_column_name
self.status_null_value = status_null_value
self.change_type_value = change_type_value
self.tagname_field = tagname_field

@staticmethod
def system_type():
Expand Down Expand Up @@ -125,7 +131,23 @@ def transform(self) -> DataFrame:
)
.withColumn("Status", lit("Good"))
.withColumn("ChangeType", lit("insert"))
.withColumn("TagName", concat_ws(":", *[col("SiteName"), col("key")]))
.withColumn(
"TagName",
when(
lit(self.tagname_field).isNotNull(),
concat_ws(
":",
*[
upper(lit(self.tagname_field)),
concat_ws(
"_", *[upper(col("SiteName")), upper(col("key"))]
),
]
),
).otherwise(
concat_ws("_", *[upper(col("SiteName")), upper(col("key"))])
),
)
)
return df.select(
"EventTime", "TagName", "Status", "Value", "ValueType", "ChangeType"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@


def test_mirico_json_to_pcdm(spark_session: SparkSession):
mirico_json_data = '{"timeStamp": "2023-11-03T16:21:16", "siteName": "20231016AMEPReleaseTesting1"}'
mirico_json_data = (
'{"timeStamp": "2023-11-03T16:21:16", "siteName": "test_site_name"}'
)
mirico_df: DataFrame = spark_session.createDataFrame([{"body": mirico_json_data}])

expected_schema = StructType(
Expand All @@ -52,17 +54,17 @@ def test_mirico_json_to_pcdm(spark_session: SparkSession):
expected_data = [
{
"EventTime": EVENTTIME,
"TagName": "20231016AMEPReleaseTesting1:timeStamp",
"TagName": "TEST_SITE_NAME_TIMESTAMP",
"Status": "Good",
"Value": "2023-11-03T16:21:16",
"ValueType": "string",
"ChangeType": "insert",
},
{
"EventTime": EVENTTIME,
"TagName": "20231016AMEPReleaseTesting1:siteName",
"TagName": "TEST_SITE_NAME_SITENAME",
"Status": "Good",
"Value": "20231016AMEPReleaseTesting1",
"Value": "test_site_name",
"ValueType": "integer",
"ChangeType": "insert",
},
Expand Down

0 comments on commit 88fb840

Please sign in to comment.