Skip to content

Commit

Permalink
Mirico Transformer, Documentation and Unit Tests (#608)
Browse files Browse the repository at this point in the history
* mirico transformer, documentation and unit tests

Signed-off-by: Chloe Ching <[email protected]>

* fix code smells

Signed-off-by: Chloe Ching <[email protected]>

* update

Signed-off-by: Chloe Ching <[email protected]>

* fix  bug

Signed-off-by: Chloe Ching <[email protected]>

---------

Signed-off-by: Chloe Ching <[email protected]>
  • Loading branch information
cching95 authored Dec 8, 2023
1 parent dc4aec6 commit 97611f0
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Convert Mirico Json to Process Control Data Model
::: src.sdk.python.rtdip_sdk.pipelines.transformers.spark.mirico_json_to_pcdm
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ nav:
- PySpark to Pandas DataFrame Conversion: sdk/code-reference/pipelines/transformers/spark/pyspark_to_pandas.md
- Honeywell APM To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/honeywell_apm_to_pcdm.md
- Process Control Data Model To Honeywell APM: sdk/code-reference/pipelines/transformers/spark/pcdm_to_honeywell_apm.md
- Mirico data To Process Control Data Model: sdk/code-reference/pipelines/transformers/spark/mirico_json_to_pcdm.md
- ISO:
- MISO To Meters Data Model: sdk/code-reference/pipelines/transformers/spark/iso/miso_to_mdm.md
- PJM To Meters Data Model: sdk/code-reference/pipelines/transformers/spark/iso/pjm_to_mdm.md
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIRICO_FIELD_MAPPINGS = {
0: {"TagName": "timeStamp", "ValueType": "string"},
1: {"TagName": "gasTypeId", "ValueType": "integer"},
2: {"TagName": "pathLengthMeters", "ValueType": "float"},
3: {"TagName": "quality", "ValueType": "integer"},
4: {"TagName": "windBearingDegreesTo", "ValueType": "float"},
5: {"TagName": "windSpeedMetersPerSecond", "ValueType": "float"},
6: {"TagName": "pressureMillibar", "ValueType": "float"},
7: {"TagName": "temperatureKelvin", "ValueType": "float"},
8: {"TagName": "gasPpm", "ValueType": "float"},
9: {"TagName": "gasType", "ValueType": "string"},
10: {"TagName": "retroLongitude", "ValueType": "float"},
11: {"TagName": "retroLatitude", "ValueType": "float"},
12: {"TagName": "sensorAltitude", "ValueType": "float"},
13: {"TagName": "sensorLongitude", "ValueType": "float"},
14: {"TagName": "sensorLatitude", "ValueType": "float"},
15: {"TagName": "retroName", "ValueType": "string"},
16: {"TagName": "siteKey", "ValueType": "string"},
17: {"TagName": "siteName", "ValueType": "string"},
18: {"TagName": "retroAltitude", "ValueType": "float"},
}
25 changes: 25 additions & 0 deletions src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
IntegerType,
ArrayType,
DoubleType,
FloatType,
)

from .models import Libraries
Expand Down Expand Up @@ -311,3 +312,27 @@ def get_dbutils(
StructField("sourceName", StringType(), True),
]
)

MIRICO_SCHEMA = StructType(
[
StructField("retroName", StringType(), True),
StructField("temperatureKelvin", FloatType(), True),
StructField("siteName", StringType(), True),
StructField("pressureMillibar", FloatType(), True),
StructField("windSpeedMetersPerSecond", FloatType(), True),
StructField("windBearingDegreesTo", FloatType(), True),
StructField("pathLengthMeters", FloatType(), True),
StructField("retroAltitude", FloatType(), True),
StructField("sensorAltitude", FloatType(), True),
StructField("quality", IntegerType(), True),
StructField("timeStamp", StringType(), True),
StructField("siteKey", StringType(), True),
StructField("gasTypeId", IntegerType(), True),
StructField("retroLongitude", FloatType(), True),
StructField("gasType", StringType(), True),
StructField("sensorLatitude", FloatType(), True),
StructField("gasPpm", FloatType(), True),
StructField("retroLatitude", FloatType(), True),
StructField("sensorLongitude", FloatType(), True),
]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyspark.sql import DataFrame
import logging
from pyspark.sql.functions import (
from_json,
col,
posexplode,
lit,
udf,
map_from_arrays,
map_keys,
map_values,
concat_ws,
)
from ...._sdk_utils.compare_versions import (
_package_version_meets_minimum,
)
from ..interfaces import TransformerInterface
from ..._pipeline_utils.models import Libraries, SystemType
from ..._pipeline_utils.spark import SEM_SCHEMA
from ..._pipeline_utils import mirico_field_mappings


class MiricoJsonToPCDMTransformer(TransformerInterface):
"""
Converts a Spark Dataframe column containing a json string created from Mirico to the Process Control Data Model.
Example
--------
```python
from rtdip_sdk.pipelines.transformers import MiricoJsonToPCDMTransformer
mirico_json_to_pcdm_transformer = MiricoJsonToPCDMTransformer(
data=df
source_column_name="body",
status_null_value="Good",
change_type_value="insert"
)
result = mirico_json_to_pcdm_transformer.transform()
```
Parameters:
data (DataFrame): Dataframe containing the column with SEM data
source_column_name (str): Spark Dataframe column containing the OPC Publisher Json OPC UA data
status_null_value (optional str): If populated, will replace 'Good' in the Status column with the specified value.
change_type_value (optional str): If populated, will replace 'insert' in the ChangeType column with the specified value.
"""

data: DataFrame
source_column_name: str
status_null_value: str
change_type_value: str

def __init__(
self,
data: DataFrame,
source_column_name: str,
status_null_value: str = "Good",
change_type_value: str = "insert",
) -> None:
_package_version_meets_minimum("pyspark", "3.4.0")
self.data = data
self.source_column_name = source_column_name
self.status_null_value = status_null_value
self.change_type_value = change_type_value

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
def libraries():
libraries = Libraries()
return libraries

@staticmethod
def settings() -> dict:
return {}

def pre_transform_validation(self):
return True

def post_transform_validation(self):
return True

def transform(self) -> DataFrame:
"""
Returns:
DataFrame: A dataframe with the specified column converted to PCDM
"""

mapping = mirico_field_mappings.MIRICO_FIELD_MAPPINGS
df = (
self.data.withColumn(
self.source_column_name,
from_json(self.source_column_name, "map<string,string>"),
)
.withColumn("TagName", map_keys("body"))
.withColumn("Value", map_values("body"))
.select(
map_from_arrays("TagName", "Value").alias("x"),
col("x.timeStamp").alias("EventTime"),
col("x.siteName").alias("SiteName"),
)
.select("EventTime", "SiteName", posexplode("x"))
.withColumn(
"ValueType", udf(lambda row: mapping[row]["ValueType"])(col("pos"))
)
.withColumn("Status", lit("Good"))
.withColumn("ChangeType", lit("insert"))
.withColumn("TagName", concat_ws(":", *[col("SiteName"), col("key")]))
)
return df.select(
"EventTime", "TagName", "Status", "Value", "ValueType", "ChangeType"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys

sys.path.insert(0, ".")
from src.sdk.python.rtdip_sdk.pipelines.transformers.spark.mirico_json_to_pcdm import (
MiricoJsonToPCDMTransformer,
)
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from datetime import datetime
import pytest
from src.sdk.python.rtdip_sdk._sdk_utils.compare_versions import (
_package_version_meets_minimum,
)

EVENTTIME = "2023-11-03T16:21:16"


def test_mirico_json_to_pcdm(spark_session: SparkSession):
mirico_json_data = '{"timeStamp": "2023-11-03T16:21:16", "siteName": "20231016AMEPReleaseTesting1"}'
mirico_df: DataFrame = spark_session.createDataFrame([{"body": mirico_json_data}])

expected_schema = StructType(
[
StructField("EventTime", StringType(), True),
StructField("TagName", StringType(), False),
StructField("Status", StringType(), False),
StructField("Value", StringType(), True),
StructField("ValueType", StringType(), True),
StructField("ChangeType", StringType(), False),
]
)

expected_data = [
{
"EventTime": EVENTTIME,
"TagName": "20231016AMEPReleaseTesting1:timeStamp",
"Status": "Good",
"Value": "2023-11-03T16:21:16",
"ValueType": "string",
"ChangeType": "insert",
},
{
"EventTime": EVENTTIME,
"TagName": "20231016AMEPReleaseTesting1:siteName",
"Status": "Good",
"Value": "20231016AMEPReleaseTesting1",
"ValueType": "integer",
"ChangeType": "insert",
},
]

expected_df: DataFrame = spark_session.createDataFrame(
schema=expected_schema, data=expected_data
)

try:
if _package_version_meets_minimum("pyspark", "3.4.0"):
mirico_json_to_pcdm_transformer = MiricoJsonToPCDMTransformer(
data=mirico_df, source_column_name="body"
)
actual_df = mirico_json_to_pcdm_transformer.transform()

assert mirico_json_to_pcdm_transformer.system_type() == SystemType.PYSPARK
assert isinstance(mirico_json_to_pcdm_transformer.libraries(), Libraries)
assert expected_schema == actual_df.schema
assert expected_df.collect() == actual_df.collect()
except:
with pytest.raises(Exception):
mirico_json_to_pcdm_transformer = MiricoJsonToPCDMTransformer(
data=mirico_df, source_column_name="body"
)

0 comments on commit 97611f0

Please sign in to comment.