Skip to content

Commit

Permalink
AIO Transformer and Unit Test (#745)
Browse files Browse the repository at this point in the history
* add aio transformer

Signed-off-by: Chloe Ching <[email protected]>

* aio unit test

Signed-off-by: Chloe Ching <[email protected]>

* unit test

Signed-off-by: Chloe Ching <[email protected]>

* add mend

Signed-off-by: Chloe Ching <[email protected]>

* unit tests for aio transformer

Signed-off-by: Chloe Ching <[email protected]>

* add aio transformer to init

Signed-off-by: Chloe Ching <[email protected]>

* add line under spark.py

Signed-off-by: Chloe Ching <[email protected]>

* update eventime in unit tests

Signed-off-by: Chloe Ching <[email protected]>

---------

Signed-off-by: Chloe Ching <[email protected]>
  • Loading branch information
cching95 authored May 20, 2024
1 parent 96872ae commit 5a796a0
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 12 deletions.
13 changes: 1 addition & 12 deletions .whitesource
Original file line number Diff line number Diff line change
@@ -1,14 +1,3 @@
{
"scanSettings": {
"baseBranches": []
},
"checkRunSettings": {
"vulnerableCheckRunConclusionLevel": "failure",
"displayMode": "diff",
"useMendCheckNames": true
},
"issueSettings": {
"minSeverityLevel": "LOW",
"issueType": "DEPENDENCY"
}
"settingsInheritedFrom": "sede-x/whitesource-config@main"
}
10 changes: 10 additions & 0 deletions src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,3 +610,13 @@ def get_dbutils(
StructField("sourceName", StringType(), True),
]
)

AIO_SCHEMA = MapType(
StringType(),
StructType(
[
StructField("SourceTimestamp", TimestampType(), True),
StructField("Value", StringType(), True),
]
),
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .spark.fledge_opcua_json_to_pcdm import *
from .spark.ssip_pi_binary_file_to_pcdm import *
from .spark.ssip_pi_binary_json_to_pcdm import *
from .spark.aio_json_to_pcdm import *
from .spark.iso import *
from .spark.edgex_opcua_json_to_pcdm import *
from .spark.ecmwf.nc_extractbase_to_weather_data_model import *
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pyspark.sql import DataFrame
from pyspark.sql.functions import from_json, col, explode, when, lit, expr

from ..interfaces import TransformerInterface
from ..._pipeline_utils.models import Libraries, SystemType
from ..._pipeline_utils.spark import AIO_SCHEMA


class AIOJsonToPCDMTransformer(TransformerInterface):
"""
Converts a Spark Dataframe column containing a json string created by Fledge to the Process Control Data Model.
Example
--------
```python
from rtdip_sdk.pipelines.transformers import AIOJsonToPCDMTransformer
fledge_opcua_json_to_pcdm_transfromer = AIOJsonToPCDMTransformer(
data=df,
souce_column_name="body",
status_null_value="Good",
change_type_value="insert"
)
result = aio_json_to_pcdm_transfromer.transform()
```
Parameters:
data (DataFrame): Dataframe containing the column with Json Fledge data
source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data
status_null_value (str): If populated, will replace 'Good' in the Status column with the specified value.
change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value.
"""

data: DataFrame
source_column_name: str
status_null_value: str
change_type_value: str

def __init__(
self,
data: DataFrame,
source_column_name: str,
status_null_value: str = "Good",
change_type_value: str = "insert",
) -> None: # NOSONAR
self.data = data
self.source_column_name = source_column_name
self.status_null_value = status_null_value
self.change_type_value = change_type_value

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
def libraries():
libraries = Libraries()
return libraries

@staticmethod
def settings() -> dict:
return {}

def pre_transform_validation(self):
return True

def post_transform_validation(self):
return True

def transform(self) -> DataFrame:
"""
Returns:
DataFrame: A dataframe with the specified column converted to PCDM
"""
df = (
self.data.select(
from_json(col(self.source_column_name), "Payload STRING").alias("body")
)
.select(from_json(expr("body.Payload"), AIO_SCHEMA).alias("body"))
.select(explode("body"))
.select(col("key").alias("TagName"), "value.*")
.select(col("SourceTimestamp").alias("EventTime"), "TagName", "Value")
.withColumn("Status", lit(self.status_null_value))
.withColumn(
"ValueType",
when(col("Value").cast("float").isNotNull(), "float").otherwise(
"string"
),
)
.withColumn("ChangeType", lit(self.change_type_value))
)

return df.select(
"EventTime", "TagName", "Status", "Value", "ValueType", "ChangeType"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys

sys.path.insert(0, ".")
from src.sdk.python.rtdip_sdk.pipelines.transformers.spark.aio_json_to_pcdm import (
AIOJsonToPCDMTransformer,
)
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from dateutil import parser


def test_aio_json_to_pcdm(spark_session: SparkSession):
aio_json_data = '{"SequenceNumber":12345,"Timestamp":"2024-05-13T13:05:10.975317Z","DataSetWriterName":"test","MessageType":"test","Payload":{"test_tag1":{"SourceTimestamp":"2024-05-13T13:05:19.7278555Z","Value":67},"test_tag2":{"SourceTimestamp":"2024-05-13T13:05:19.7288616Z","Value":165.5}}}'
aio_df: DataFrame = spark_session.createDataFrame([aio_json_data], "string").toDF(
"body"
)

expected_schema = StructType(
[
StructField("EventTime", TimestampType(), True),
StructField("TagName", StringType(), False),
StructField("Status", StringType(), False),
StructField("Value", StringType(), True),
StructField("ValueType", StringType(), False),
StructField("ChangeType", StringType(), False),
]
)

expected_data = [
{
"TagName": "test_tag1",
"Value": "67",
"EventTime": parser.parse("2024-05-13T13:05:19.7278555Z"),
"Status": "Good",
"ValueType": "float",
"ChangeType": "insert",
},
{
"TagName": "test_tag2",
"Value": "165.5",
"EventTime": parser.parse("2024-05-13T13:05:19.7288616Z"),
"Status": "Good",
"ValueType": "float",
"ChangeType": "insert",
},
]

expected_df: DataFrame = spark_session.createDataFrame(
schema=expected_schema, data=expected_data
)

eventhub_json_to_aio_transformer = AIOJsonToPCDMTransformer(
data=aio_df, source_column_name="body"
)
actual_df = eventhub_json_to_aio_transformer.transform()

assert eventhub_json_to_aio_transformer.system_type() == SystemType.PYSPARK
assert isinstance(eventhub_json_to_aio_transformer.libraries(), Libraries)
assert expected_schema == actual_df.schema
assert expected_df.collect() == actual_df.collect()

0 comments on commit 5a796a0

Please sign in to comment.