diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py index 7d77afd15..883955de5 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py @@ -24,6 +24,8 @@ map_values, concat_ws, to_timestamp, + upper, + when, ) from ...._sdk_utils.compare_versions import ( _package_version_meets_minimum, @@ -47,6 +49,7 @@ class MiricoJsonToPCDMTransformer(TransformerInterface): source_column_name="body", status_null_value="Good", change_type_value="insert" + tagname_field="test" ) result = mirico_json_to_pcdm_transformer.transform() @@ -57,6 +60,7 @@ class MiricoJsonToPCDMTransformer(TransformerInterface): source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data status_null_value (optional str): If populated, will replace 'Good' in the Status column with the specified value. change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value. + tagname_field (optional str): If populated, will add the specified field to the TagName column. """ data: DataFrame @@ -70,12 +74,14 @@ def __init__( source_column_name: str, status_null_value: str = "Good", change_type_value: str = "insert", + tagname_field: str = None, ) -> None: _package_version_meets_minimum("pyspark", "3.4.0") self.data = data self.source_column_name = source_column_name self.status_null_value = status_null_value self.change_type_value = change_type_value + self.tagname_field = tagname_field @staticmethod def system_type(): @@ -125,7 +131,23 @@ def transform(self) -> DataFrame: ) .withColumn("Status", lit("Good")) .withColumn("ChangeType", lit("insert")) - .withColumn("TagName", concat_ws(":", *[col("SiteName"), col("key")])) + .withColumn( + "TagName", + when( + lit(self.tagname_field).isNotNull(), + concat_ws( + ":", + *[ + upper(lit(self.tagname_field)), + concat_ws( + "_", *[upper(col("SiteName")), upper(col("key"))] + ), + ] + ), + ).otherwise( + concat_ws("_", *[upper(col("SiteName")), upper(col("key"))]) + ), + ) ) return df.select( "EventTime", "TagName", "Status", "Value", "ValueType", "ChangeType" diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py index 645268c60..6798b00f2 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py @@ -35,7 +35,9 @@ def test_mirico_json_to_pcdm(spark_session: SparkSession): - mirico_json_data = '{"timeStamp": "2023-11-03T16:21:16", "siteName": "20231016AMEPReleaseTesting1"}' + mirico_json_data = ( + '{"timeStamp": "2023-11-03T16:21:16", "siteName": "test_site_name"}' + ) mirico_df: DataFrame = spark_session.createDataFrame([{"body": mirico_json_data}]) expected_schema = StructType( @@ -52,7 +54,7 @@ def test_mirico_json_to_pcdm(spark_session: SparkSession): expected_data = [ { "EventTime": EVENTTIME, - "TagName": "20231016AMEPReleaseTesting1:timeStamp", + "TagName": "TEST_SITE_NAME_TIMESTAMP", "Status": "Good", "Value": "2023-11-03T16:21:16", "ValueType": "string", @@ -60,9 +62,9 @@ def test_mirico_json_to_pcdm(spark_session: SparkSession): }, { "EventTime": EVENTTIME, - "TagName": "20231016AMEPReleaseTesting1:siteName", + "TagName": "TEST_SITE_NAME_SITENAME", "Status": "Good", - "Value": "20231016AMEPReleaseTesting1", + "Value": "test_site_name", "ValueType": "integer", "ChangeType": "insert", },