diff --git a/docs/sdk/code-reference/pipelines/transformers/spark/opc_publisher_opcae_json_to_pcdm.md b/docs/sdk/code-reference/pipelines/transformers/spark/opc_publisher_opcae_json_to_pcdm.md new file mode 100644 index 000000000..e317f4112 --- /dev/null +++ b/docs/sdk/code-reference/pipelines/transformers/spark/opc_publisher_opcae_json_to_pcdm.md @@ -0,0 +1,2 @@ +# Convert OPC Publisher Json for A&E(Alarm & Events) Data to Process Control Data Model +::: src.sdk.python.rtdip_sdk.pipelines.transformers.spark.opc_publisher_opcae_json_to_pcdm diff --git a/mkdocs.yml b/mkdocs.yml index d2616827c..e73f164d4 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -190,6 +190,7 @@ nav: - Spark: - Binary To String: sdk/code-reference/pipelines/transformers/spark/binary_to_string.md - OPC Publisher Json To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/opc_publisher_opcua_json_to_pcdm.md + - OPC Publisher Json for A&E(Alarm & Events) Data to Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/opc_publisher_opcae_json_to_pcdm.md - Fledge Json To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/fledge_opcua_json_to_pcdm.md - EdgeX JSON data To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/edgex_opcua_json_to_pcdm.md - SEM data To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/sem_json_to_pcdm.md diff --git a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py index 0f60d035e..a9924aabe 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py +++ b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py @@ -20,6 +20,7 @@ TimestampType, StringType, BinaryType, + BooleanType, LongType, MapType, IntegerType, @@ -160,6 +161,303 @@ def get_dbutils( ] ) +OPC_PUBLISHER_AE_SCHEMA = StructType( + [ + StructField("NodeId", StringType(), True), + StructField("EndpointUrl", StringType(), True), + StructField("DisplayName", StringType(), True), + StructField( + "Value", + StructType( + [ + StructField( + "ConditionId", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "AckedState", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "AckedState/FalseState", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "AckedState/Id", + StructType( + [ + StructField("Value", BooleanType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "AckedState/TrueState", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "ActiveState", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "ActiveState/FalseState", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "ActiveState/Id", + StructType( + [ + StructField("Value", BooleanType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "ActiveState/TrueState", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "EnabledState", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "EnabledState/FalseState", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "EnabledState/Id", + StructType( + [ + StructField("Value", BooleanType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "EnabledState/TrueState", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "EventId", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "EventType", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "HighHighLimit", + StructType( + [ + StructField("Value", DoubleType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "HighLimit", + StructType( + [ + StructField("Value", DoubleType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "InputNode", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "LowLimit", + StructType( + [ + StructField("Value", DoubleType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "LowLowLimit", + StructType( + [ + StructField("Value", DoubleType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "Message", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "Quality", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "ReceiveTime", + StructType( + [ + StructField("Value", TimestampType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "Retain", + StructType( + [ + StructField("Value", BooleanType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "Severity", + StructType( + [ + StructField("Value", DoubleType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "SourceName", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "SourceNode", + StructType( + [ + StructField("Value", StringType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + StructField( + "Time", + StructType( + [ + StructField("Value", TimestampType(), True), + StructField("SourceTimestamp", TimestampType(), True), + ] + ), + True, + ), + ] + ), + True, + ), + ] +) + + PROCESS_DATA_MODEL_SCHEMA = StructType( [ StructField("TagName", StringType(), True), diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/__init__.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/__init__.py index 2288a130c..8dbccb8b9 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/transformers/__init__.py +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/__init__.py @@ -14,6 +14,7 @@ from .spark.binary_to_string import * from .spark.opc_publisher_opcua_json_to_pcdm import * +from .spark.opc_publisher_opcae_json_to_pcdm import * from .spark.fledge_opcua_json_to_pcdm import * from .spark.ssip_pi_binary_file_to_pcdm import * from .spark.ssip_pi_binary_json_to_pcdm import * diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opc_publisher_opcae_json_to_pcdm.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opc_publisher_opcae_json_to_pcdm.py new file mode 100644 index 000000000..7b334ecf9 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/opc_publisher_opcae_json_to_pcdm.py @@ -0,0 +1,170 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pyspark.sql import DataFrame +from pyspark.sql.functions import ( + from_json, + col, + explode, + to_timestamp, + coalesce, +) +from pyspark.sql.types import ArrayType, StringType + +from ..interfaces import TransformerInterface +from ..._pipeline_utils.models import Libraries, SystemType +from ..._pipeline_utils.spark import OPC_PUBLISHER_AE_SCHEMA + + +class OPCPublisherOPCAEJsonToPCDMTransformer(TransformerInterface): + """ + Converts a Spark Dataframe column containing a json string created by OPC Publisher for A&E(Alarm &Events) data to the Process Control Data Model. + + Example + -------- + ```python + from rtdip_sdk.pipelines.transformers import OPCPublisherOPCAEJsonToPCDMTransformer + + opc_publisher_opcae_json_to_pcdm_transformer = OPCPublisherOPCAEJsonToPCDMTransformer( + data=df, + souce_column_name="body", + timestamp_formats=[ + "yyyy-MM-dd'T'HH:mm:ss.SSSX", + "yyyy-MM-dd'T'HH:mm:ssX" + ], + filter=None + ) + + result = opc_publisher_opcae_json_to_pcdm_transformer.transform() + ``` + + Parameters: + data (DataFrame): Dataframe containing the column with Json OPC AE data + source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC AE data + timestamp_formats (optional list[str]): Specifies the timestamp formats to be used for converting the timestamp string to a Timestamp Type. For more information on formats, refer to this [documentation.](https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html) + filter (optional str): Enables providing a filter to the data which can be required in certain scenarios. For example, it would be possible to filter on IoT Hub Device Id and Module by providing a filter in SQL format such as `systemProperties.iothub-connection-device-id = "" AND systemProperties.iothub-connection-module-id = ""` + """ + + data: DataFrame + source_column_name: str + timestamp_formats: list + filter: str + + def __init__( + self, + data: DataFrame, + source_column_name: str, + timestamp_formats=None, + filter: str = None, + ) -> None: # NOSONAR + self.data = data + self.source_column_name = source_column_name + self.timestamp_formats = timestamp_formats or [ + "yyyy-MM-dd'T'HH:mm:ss.SSSX", + "yyyy-MM-dd'T'HH:mm:ssX", + ] + self.filter = filter + + @staticmethod + def system_type(): + """ + Attributes: + SystemType (Environment): Requires PYSPARK + """ + return SystemType.PYSPARK + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + def pre_transform_validation(self): + return True + + def post_transform_validation(self): + return True + + def transform(self) -> DataFrame: + """ + Returns: + DataFrame: A dataframe with the OPC Publisher A&E data converted to the Process Control Data Model + """ + + df = self.data.withColumn( + self.source_column_name, + from_json(col(self.source_column_name), ArrayType(StringType())), + ).withColumn(self.source_column_name, explode(self.source_column_name)) + + if self.filter != None: + df = df.where(self.filter) + + df = df.withColumn( + "OPCAE", from_json(col(self.source_column_name), OPC_PUBLISHER_AE_SCHEMA) + ) + + df = df.select( + col("OPCAE.NodeId"), + col("OPCAE.DisplayName"), + col("OPCAE.Value.ConditionId.Value").alias("ConditionId"), + col("OPCAE.Value.AckedState.Value").alias("AckedState"), + col("OPCAE.Value.AckedState/FalseState.Value").alias( + "AckedState/FalseState" + ), + col("OPCAE.Value.AckedState/Id.Value").alias("AckedState/Id"), + col("OPCAE.Value.AckedState/TrueState.Value").alias("AckedState/TrueState"), + col("OPCAE.Value.ActiveState.Value").alias("ActiveState"), + col("OPCAE.Value.ActiveState/FalseState.Value").alias( + "ActiveState/FalseState" + ), + col("OPCAE.Value.ActiveState/Id.Value").alias("ActiveState/Id"), + col("OPCAE.Value.ActiveState/TrueState.Value").alias( + "ActiveState/TrueState" + ), + col("OPCAE.Value.EnabledState.Value").alias("EnabledState"), + col("OPCAE.Value.EnabledState/FalseState.Value").alias( + "EnabledState/FalseState" + ), + col("OPCAE.Value.EnabledState/Id.Value").alias("EnabledState/Id"), + col("OPCAE.Value.EnabledState/TrueState.Value").alias( + "EnabledState/TrueState" + ), + col("OPCAE.Value.EventId.Value").alias("EventId"), + col("OPCAE.Value.EventType.Value").alias("EventType"), + col("OPCAE.Value.HighHighLimit.Value").alias("HighHighLimit"), + col("OPCAE.Value.HighLimit.Value").alias("HighLimit"), + col("OPCAE.Value.InputNode.Value").alias("InputNode"), + col("OPCAE.Value.LowLimit.Value").alias("LowLimit"), + col("OPCAE.Value.LowLowLimit.Value").alias("LowLowLimit"), + col("OPCAE.Value.Message.Value").alias("Message"), + col("OPCAE.Value.Quality.Value").alias("Quality"), + col("OPCAE.Value.ReceiveTime.Value").alias("ReceiveTime"), + col("OPCAE.Value.Retain.Value").alias("Retain"), + col("OPCAE.Value.Severity.Value").alias("Severity"), + col("OPCAE.Value.SourceName.Value").alias("SourceName"), + col("OPCAE.Value.SourceNode.Value").alias("SourceNode"), + col("OPCAE.Value.Time.Value").alias("EventTime"), + ) + + df = df.withColumn( + "EventTime", + coalesce( + *[to_timestamp(col("EventTime"), f) for f in self.timestamp_formats] + ), + ) + + return df diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_opc_publisher_opcae_json_to_pcdm.py b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_opc_publisher_opcae_json_to_pcdm.py new file mode 100644 index 000000000..b9abc1baa --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_opc_publisher_opcae_json_to_pcdm.py @@ -0,0 +1,126 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +sys.path.insert(0, ".") +from src.sdk.python.rtdip_sdk.pipelines.transformers.spark.opc_publisher_opcae_json_to_pcdm import ( + OPCPublisherOPCAEJsonToPCDMTransformer, +) +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import ( + Libraries, + SystemType, +) +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.spark import ( + OPC_PUBLISHER_AE_SCHEMA, +) + +from pyspark.sql import SparkSession, DataFrame +from pyspark.sql.types import ( + StructType, + StructField, + StringType, + TimestampType, + DoubleType, + BooleanType, +) +from datetime import datetime + + +def test_opc_publisher_json_to_pcdm(spark_session: SparkSession): + opcua_json_data = '[{"NodeId":"ns=6;s=MyLevel.Alarm","EndpointUrl":"opc.tcp://xxxxxxxxx/OPCUA/SimulationServer","DisplayName":"MyLevelAlarm","Value":{"ConditionId":{"Value":"https://www.prosysopc.com/OPCUA/SampleAddressSpace#s=MyLevel.Alarm","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"AckedState":{"Value":"Unacknowledged","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"AckedState/FalseState":{"Value":"Unacknowledged","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"AckedState/Id":{"Value":false,"SourceTimestamp":"2023-10-19T13:08:08.503Z"},"AckedState/TrueState":{"Value":"Acknowledged","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"ActiveState":{"Value":"Inactive","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"ActiveState/FalseState":{"Value":"Inactive","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"ActiveState/Id":{"Value":false,"SourceTimestamp":"2023-10-19T13:08:08.503Z"},"ActiveState/TrueState":{"Value":"Active","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"EnabledState":{"Value":"Enabled","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"EnabledState/FalseState":{"Value":"Disabled","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"EnabledState/Id":{"Value":true,"SourceTimestamp":"2023-10-19T13:08:08.503Z"},"EnabledState/TrueState":{"Value":"Enabled","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"EventId":{"Value":"AAAAAAAAGycAAAAAAAAbJg==","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"EventType":{"Value":"i=9482","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"HighHighLimit":{"Value":90,"SourceTimestamp":"2023-10-19T13:08:08.503Z"},"HighLimit":{"Value":70,"SourceTimestamp":"2023-10-19T13:08:08.503Z"},"InputNode":{"Value":null,"SourceTimestamp":"2023-10-19T13:08:08.503Z"},"LowLimit":{"Value":30,"SourceTimestamp":"2023-10-19T13:08:08.503Z"},"LowLowLimit":{"Value":10,"SourceTimestamp":"2023-10-19T13:08:08.503Z"},"Message":{"Value":"Level exceeded","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"Quality":{"Value":null,"SourceTimestamp":"2023-10-19T13:08:08.503Z"},"ReceiveTime":{"Value":"2023-10-19T13:08:08.503Z","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"Retain":{"Value":true,"SourceTimestamp":"2023-10-19T13:08:08.503Z"},"Severity":{"Value":500,"SourceTimestamp":"2023-10-19T13:08:08.503Z"},"SourceName":{"Value":"MyLevel","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"SourceNode":{"Value":"https://www.prosysopc.com/OPCUA/SampleAddressSpace#s=MyLevel","SourceTimestamp":"2023-10-19T13:08:08.503Z"},"Time":{"Value":"2023-10-19T13:08:08.503Z","SourceTimestamp":"2023-10-19T13:08:08.503Z"}}}]' + opcua_df: DataFrame = spark_session.createDataFrame([{"body": opcua_json_data}]) + + expected_schema = StructType( + [ + StructField("NodeId", StringType(), True), + StructField("DisplayName", StringType(), True), + StructField("ConditionId", StringType(), True), + StructField("AckedState", StringType(), True), + StructField("AckedState/FalseState", StringType(), True), + StructField("AckedState/Id", BooleanType(), True), + StructField("AckedState/TrueState", StringType(), True), + StructField("ActiveState", StringType(), True), + StructField("ActiveState/FalseState", StringType(), True), + StructField("ActiveState/Id", BooleanType(), True), + StructField("ActiveState/TrueState", StringType(), True), + StructField("EnabledState", StringType(), True), + StructField("EnabledState/FalseState", StringType(), True), + StructField("EnabledState/Id", BooleanType(), True), + StructField("EnabledState/TrueState", StringType(), True), + StructField("EventId", StringType(), True), + StructField("EventType", StringType(), True), + StructField("HighHighLimit", DoubleType(), True), + StructField("HighLimit", DoubleType(), True), + StructField("InputNode", StringType(), True), + StructField("LowLimit", DoubleType(), True), + StructField("LowLowLimit", DoubleType(), True), + StructField("Message", StringType(), True), + StructField("Quality", StringType(), True), + StructField("ReceiveTime", TimestampType(), True), + StructField("Retain", BooleanType(), True), + StructField("Severity", DoubleType(), True), + StructField("SourceName", StringType(), True), + StructField("SourceNode", StringType(), True), + StructField("EventTime", TimestampType(), True), + ] + ) + + expected_data = [ + { + "NodeId": "ns=6;s=MyLevel.Alarm", + "DisplayName": "MyLevelAlarm", + "ConditionId": "https://www.prosysopc.com/OPCUA/SampleAddressSpace#s=MyLevel.Alarm", + "AckedState": "Unacknowledged", + "AckedState/FalseState": "Unacknowledged", + "AckedState/Id": False, + "AckedState/TrueState": "Acknowledged", + "ActiveState": "Inactive", + "ActiveState/FalseState": "Inactive", + "ActiveState/Id": False, + "ActiveState/TrueState": "Active", + "EnabledState": "Enabled", + "EnabledState/FalseState": "Disabled", + "EnabledState/Id": True, + "EnabledState/TrueState": "Enabled", + "EventId": "AAAAAAAAGycAAAAAAAAbJg==", + "EventType": "i=9482", + "HighHighLimit": 90.0, + "HighLimit": 70.0, + "InputNode": None, + "LowLimit": 30.0, + "LowLowLimit": 10.0, + "Message": "Level exceeded", + "Quality": None, + "ReceiveTime": datetime.fromisoformat("2023-10-19T13:08:08.503+00:00"), + "Retain": True, + "Severity": 500.0, + "SourceName": "MyLevel", + "SourceNode": "https://www.prosysopc.com/OPCUA/SampleAddressSpace#s=MyLevel", + "EventTime": datetime.fromisoformat("2023-10-19T13:08:08.503+00:00"), + } + ] + + expected_df: DataFrame = spark_session.createDataFrame( + schema=expected_schema, data=expected_data + ) + eventhub_json_to_opcae_transformer = OPCPublisherOPCAEJsonToPCDMTransformer( + opcua_df, source_column_name="body" + ) + actual_df = eventhub_json_to_opcae_transformer.transform() + + assert eventhub_json_to_opcae_transformer.system_type() == SystemType.PYSPARK + assert isinstance(eventhub_json_to_opcae_transformer.libraries(), Libraries) + assert expected_schema == actual_df.schema + assert expected_df.collect() == actual_df.collect()