diff --git a/docs/sdk/code-reference/pipelines/transformers/spark/mirico_json_to_pcdm.md b/docs/sdk/code-reference/pipelines/transformers/spark/mirico_json_to_pcdm.md new file mode 100644 index 000000000..afbffeae6 --- /dev/null +++ b/docs/sdk/code-reference/pipelines/transformers/spark/mirico_json_to_pcdm.md @@ -0,0 +1,2 @@ +# Convert Mirico Json to Process Control Data Model +::: src.sdk.python.rtdip_sdk.pipelines.transformers.spark.mirico_json_to_pcdm \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index 164934d8c..b27f43d5e 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -195,6 +195,7 @@ nav: - PySpark to Pandas DataFrame Conversion: sdk/code-reference/pipelines/transformers/spark/pyspark_to_pandas.md - Honeywell APM To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/honeywell_apm_to_pcdm.md - Process Control Data Model To Honeywell APM: sdk/code-reference/pipelines/transformers/spark/pcdm_to_honeywell_apm.md + - Mirico data To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/mirico_json_to_pcdm.md - ISO: - MISO To Meters Data Model: sdk/code-reference/pipelines/transformers/spark/iso/miso_to_mdm.md - PJM To Meters Data Model: sdk/code-reference/pipelines/transformers/spark/iso/pjm_to_mdm.md diff --git a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/mirico_field_mappings.py b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/mirico_field_mappings.py new file mode 100644 index 000000000..a7f9a6134 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/mirico_field_mappings.py @@ -0,0 +1,21 @@ +MIRICO_FIELD_MAPPINGS = { + 0: {"TagName": "timeStamp", "ValueType": "string"}, + 1: {"TagName": "gasTypeId", "ValueType": "integer"}, + 2: {"TagName": "pathLengthMeters", "ValueType": "float"}, + 3: {"TagName": "quality", "ValueType": "integer"}, + 4: {"TagName": "windBearingDegreesTo", "ValueType": "float"}, + 5: {"TagName": "windSpeedMetersPerSecond", "ValueType": "float"}, + 6: {"TagName": "pressureMillibar", "ValueType": "float"}, + 7: {"TagName": "temperatureKelvin", "ValueType": "float"}, + 8: {"TagName": "gasPpm", "ValueType": "float"}, + 9: {"TagName": "gasType", "ValueType": "string"}, + 10: {"TagName": "retroLongitude", "ValueType": "float"}, + 11: {"TagName": "retroLatitude", "ValueType": "float"}, + 12: {"TagName": "sensorAltitude", "ValueType": "float"}, + 13: {"TagName": "sensorLongitude", "ValueType": "float"}, + 14: {"TagName": "sensorLatitude", "ValueType": "float"}, + 15: {"TagName": "retroName", "ValueType": "string"}, + 16: {"TagName": "siteKey", "ValueType": "string"}, + 17: {"TagName": "siteName", "ValueType": "string"}, + 18: {"TagName": "retroAltitude", "ValueType": "float"}, +} diff --git a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py index 233c8078d..0f60d035e 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py +++ b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py @@ -25,6 +25,7 @@ IntegerType, ArrayType, DoubleType, + FloatType, ) from .models import Libraries @@ -311,3 +312,27 @@ def get_dbutils( StructField("sourceName", StringType(), True), ] ) + +MIRICO_SCHEMA = StructType( + [ + StructField("retroName", StringType(), True), + StructField("temperatureKelvin", FloatType(), True), + StructField("siteName", StringType(), True), + StructField("pressureMillibar", FloatType(), True), + StructField("windSpeedMetersPerSecond", FloatType(), True), + StructField("windBearingDegreesTo", FloatType(), True), + StructField("pathLengthMeters", FloatType(), True), + StructField("retroAltitude", FloatType(), True), + StructField("sensorAltitude", FloatType(), True), + StructField("quality", IntegerType(), True), + StructField("timeStamp", StringType(), True), + StructField("siteKey", StringType(), True), + StructField("gasTypeId", IntegerType(), True), + StructField("retroLongitude", FloatType(), True), + StructField("gasType", StringType(), True), + StructField("sensorLatitude", FloatType(), True), + StructField("gasPpm", FloatType(), True), + StructField("retroLatitude", FloatType(), True), + StructField("sensorLongitude", FloatType(), True), + ] +) diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py new file mode 100644 index 000000000..9a5751970 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py @@ -0,0 +1,132 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from pyspark.sql import DataFrame +import logging +from pyspark.sql.functions import ( + from_json, + col, + posexplode, + lit, + udf, + map_from_arrays, + map_keys, + map_values, + concat_ws, +) +from ...._sdk_utils.compare_versions import ( + _package_version_meets_minimum, +) +from ..interfaces import TransformerInterface +from ..._pipeline_utils.models import Libraries, SystemType +from ..._pipeline_utils.spark import SEM_SCHEMA +from ..._pipeline_utils import mirico_field_mappings + + +class MiricoJsonToPCDMTransformer(TransformerInterface): + """ + Converts a Spark Dataframe column containing a json string created from Mirico to the Process Control Data Model. + + Example + -------- + ```python + from rtdip_sdk.pipelines.transformers import MiricoJsonToPCDMTransformer + + mirico_json_to_pcdm_transformer = MiricoJsonToPCDMTransformer( + data=df + source_column_name="body", + status_null_value="Good", + change_type_value="insert" + ) + + result = mirico_json_to_pcdm_transformer.transform() + ``` + + Parameters: + data (DataFrame): Dataframe containing the column with SEM data + source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data + status_null_value (optional str): If populated, will replace 'Good' in the Status column with the specified value. + change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value. + """ + + data: DataFrame + source_column_name: str + status_null_value: str + change_type_value: str + + def __init__( + self, + data: DataFrame, + source_column_name: str, + status_null_value: str = "Good", + change_type_value: str = "insert", + ) -> None: + _package_version_meets_minimum("pyspark", "3.4.0") + self.data = data + self.source_column_name = source_column_name + self.status_null_value = status_null_value + self.change_type_value = change_type_value + + @staticmethod + def system_type(): + """ + Attributes: + SystemType (Environment): Requires PYSPARK + """ + return SystemType.PYSPARK + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + def pre_transform_validation(self): + return True + + def post_transform_validation(self): + return True + + def transform(self) -> DataFrame: + """ + Returns: + DataFrame: A dataframe with the specified column converted to PCDM + """ + + mapping = mirico_field_mappings.MIRICO_FIELD_MAPPINGS + df = ( + self.data.withColumn( + self.source_column_name, + from_json(self.source_column_name, "map"), + ) + .withColumn("TagName", map_keys("body")) + .withColumn("Value", map_values("body")) + .select( + map_from_arrays("TagName", "Value").alias("x"), + col("x.timeStamp").alias("EventTime"), + col("x.siteName").alias("SiteName"), + ) + .select("EventTime", "SiteName", posexplode("x")) + .withColumn( + "ValueType", udf(lambda row: mapping[row]["ValueType"])(col("pos")) + ) + .withColumn("Status", lit("Good")) + .withColumn("ChangeType", lit("insert")) + .withColumn("TagName", concat_ws(":", *[col("SiteName"), col("key")])) + ) + return df.select( + "EventTime", "TagName", "Status", "Value", "ValueType", "ChangeType" + ) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py new file mode 100644 index 000000000..4926d6793 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py @@ -0,0 +1,90 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +sys.path.insert(0, ".") +from src.sdk.python.rtdip_sdk.pipelines.transformers.spark.mirico_json_to_pcdm import ( + MiricoJsonToPCDMTransformer, +) +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import ( + Libraries, + SystemType, +) + +from pyspark.sql import SparkSession, DataFrame +from pyspark.sql.types import StructType, StructField, StringType, TimestampType +from datetime import datetime +import pytest +from src.sdk.python.rtdip_sdk._sdk_utils.compare_versions import ( + _package_version_meets_minimum, +) + +EVENTTIME = "2023-11-03T16:21:16" + + +def test_mirico_json_to_pcdm(spark_session: SparkSession): + mirico_json_data = '{"timeStamp": "2023-11-03T16:21:16", "siteName": "20231016AMEPReleaseTesting1"}' + mirico_df: DataFrame = spark_session.createDataFrame([{"body": mirico_json_data}]) + + expected_schema = StructType( + [ + StructField("EventTime", StringType(), True), + StructField("TagName", StringType(), False), + StructField("Status", StringType(), False), + StructField("Value", StringType(), True), + StructField("ValueType", StringType(), True), + StructField("ChangeType", StringType(), False), + ] + ) + + expected_data = [ + { + "EventTime": EVENTTIME, + "TagName": "20231016AMEPReleaseTesting1:timeStamp", + "Status": "Good", + "Value": "2023-11-03T16:21:16", + "ValueType": "string", + "ChangeType": "insert", + }, + { + "EventTime": EVENTTIME, + "TagName": "20231016AMEPReleaseTesting1:siteName", + "Status": "Good", + "Value": "20231016AMEPReleaseTesting1", + "ValueType": "integer", + "ChangeType": "insert", + }, + ] + + expected_df: DataFrame = spark_session.createDataFrame( + schema=expected_schema, data=expected_data + ) + + try: + if _package_version_meets_minimum("pyspark", "3.4.0"): + mirico_json_to_pcdm_transformer = MiricoJsonToPCDMTransformer( + data=mirico_df, source_column_name="body" + ) + actual_df = mirico_json_to_pcdm_transformer.transform() + + assert mirico_json_to_pcdm_transformer.system_type() == SystemType.PYSPARK + assert isinstance(mirico_json_to_pcdm_transformer.libraries(), Libraries) + assert expected_schema == actual_df.schema + assert expected_df.collect() == actual_df.collect() + except: + with pytest.raises(Exception): + mirico_json_to_pcdm_transformer = MiricoJsonToPCDMTransformer( + data=mirico_df, source_column_name="body" + )