Skip to content

Commit

Permalink
Mirico Transformer TagName Column Updates (#708)
Browse files Browse the repository at this point in the history
* updated mirico transformer tagname column

Signed-off-by: Chloe Ching <[email protected]>

* update casing in mirico transformer

Signed-off-by: Chloe Ching <[email protected]>

---------

Signed-off-by: Chloe Ching <[email protected]>
  • Loading branch information
cching95 authored Mar 19, 2024
1 parent fb5897f commit 87b6c25
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
MIRICO_FIELD_MAPPINGS = {
0: {"TagName": "timeStamp", "ValueType": "string"},
1: {"TagName": "gasTypeId", "ValueType": "integer"},
1: {"TagName": "gasTypeId", "ValueType": "float"},
2: {"TagName": "pathLengthMeters", "ValueType": "float"},
3: {"TagName": "quality", "ValueType": "integer"},
3: {"TagName": "quality", "ValueType": "float"},
4: {"TagName": "windBearingDegreesTo", "ValueType": "float"},
5: {"TagName": "windSpeedMetersPerSecond", "ValueType": "float"},
6: {"TagName": "pressureMillibar", "ValueType": "float"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ def transform(self) -> DataFrame:
.withColumn("Value", map_values("body"))
.select(
map_from_arrays("TagName", "Value").alias("x"),
to_timestamp(col("x.timeStamp")).alias("EventTime"),
col("x.siteName").alias("SiteName"),
col("x.timeStamp").alias("EventTime"),
col("x.siteName").alias("siteName"),
col("x.gasType").alias("gasType"),
col("x.retroName").alias("retroName"),
)
.select("EventTime", "SiteName", posexplode("x"))
.select("EventTime", "siteName", "gasType", "retroName", posexplode("x"))
.withColumn(
"ValueType", udf(lambda row: mapping[row]["ValueType"])(col("pos"))
)
Expand All @@ -140,14 +142,52 @@ def transform(self) -> DataFrame:
*[
upper(lit(self.tagname_field)),
concat_ws(
"_", *[upper(col("SiteName")), upper(col("key"))]
"_",
*[
upper(col("siteName")),
upper(col("retroName")),
when(
upper(col("key")) == "GASPPM",
concat_ws(
"_",
*[upper(col("key")), upper(col("gasType"))]
),
).otherwise(upper(col("key"))),
]
),
]
),
).otherwise(
concat_ws("_", *[upper(col("SiteName")), upper(col("key"))])
concat_ws(
"_",
*[
upper(col("siteName")),
upper(col("retroName")),
when(
upper(col("key")) == "GASPPM",
concat_ws(
"_", *[upper(col("key")), upper(col("gasType"))]
),
).otherwise(upper(col("key"))),
]
)
),
)
.filter(
~col("key").isin(
"timeStamp",
"gasType",
"retroLongitude",
"retroLatitude",
"retroAltitude",
"sensorLongitude",
"sensorLatitude",
"sensorAltitude",
"siteName",
"siteKey",
"retroName",
)
)
)
return df.select(
"EventTime", "TagName", "Status", "Value", "ValueType", "ChangeType"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,14 @@
_package_version_meets_minimum,
)

EVENTTIME = datetime.fromisoformat("2023-11-03T16:21:16")


def test_mirico_json_to_pcdm(spark_session: SparkSession):
mirico_json_data = (
'{"timeStamp": "2023-11-03T16:21:16", "siteName": "test_site_name"}'
)
mirico_json_data = '{"timeStamp": "2023-11-03T16:21:16", "siteName": "test_site_name", "gasTypeId": 3, "quality": 10}'
mirico_df: DataFrame = spark_session.createDataFrame([{"body": mirico_json_data}])

expected_schema = StructType(
[
StructField("EventTime", TimestampType(), True),
StructField("EventTime", StringType(), True),
StructField("TagName", StringType(), False),
StructField("Status", StringType(), False),
StructField("Value", StringType(), True),
Expand All @@ -53,19 +49,19 @@ def test_mirico_json_to_pcdm(spark_session: SparkSession):

expected_data = [
{
"EventTime": EVENTTIME,
"TagName": "TEST_SITE_NAME_TIMESTAMP",
"EventTime": "2023-11-03T16:21:16",
"TagName": "TEST_SITE_NAME_GASTYPEID",
"Status": "Good",
"Value": "2023-11-03T16:21:16",
"ValueType": "string",
"Value": 3,
"ValueType": "float",
"ChangeType": "insert",
},
{
"EventTime": EVENTTIME,
"TagName": "TEST_SITE_NAME_SITENAME",
"EventTime": "2023-11-03T16:21:16",
"TagName": "TEST_SITE_NAME_QUALITY",
"Status": "Good",
"Value": "test_site_name",
"ValueType": "integer",
"Value": 10,
"ValueType": "float",
"ChangeType": "insert",
},
]
Expand Down

0 comments on commit 87b6c25

Please sign in to comment.