Skip to content

Commit

Permalink
fix EventTime column data type (#629)
Browse files Browse the repository at this point in the history
Signed-off-by: Chloe Ching <[email protected]>
  • Loading branch information
cching95 authored Jan 10, 2024
1 parent 0aee809 commit d27b768
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 28 deletions.
24 changes: 0 additions & 24 deletions src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,27 +610,3 @@ def get_dbutils(
StructField("sourceName", StringType(), True),
]
)

MIRICO_SCHEMA = StructType(
[
StructField("retroName", StringType(), True),
StructField("temperatureKelvin", FloatType(), True),
StructField("siteName", StringType(), True),
StructField("pressureMillibar", FloatType(), True),
StructField("windSpeedMetersPerSecond", FloatType(), True),
StructField("windBearingDegreesTo", FloatType(), True),
StructField("pathLengthMeters", FloatType(), True),
StructField("retroAltitude", FloatType(), True),
StructField("sensorAltitude", FloatType(), True),
StructField("quality", IntegerType(), True),
StructField("timeStamp", StringType(), True),
StructField("siteKey", StringType(), True),
StructField("gasTypeId", IntegerType(), True),
StructField("retroLongitude", FloatType(), True),
StructField("gasType", StringType(), True),
StructField("sensorLatitude", FloatType(), True),
StructField("gasPpm", FloatType(), True),
StructField("retroLatitude", FloatType(), True),
StructField("sensorLongitude", FloatType(), True),
]
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
map_keys,
map_values,
concat_ws,
to_timestamp,
)
from ...._sdk_utils.compare_versions import (
_package_version_meets_minimum,
)
from ..interfaces import TransformerInterface
from ..._pipeline_utils.models import Libraries, SystemType
from ..._pipeline_utils.spark import SEM_SCHEMA
from ..._pipeline_utils import mirico_field_mappings


Expand Down Expand Up @@ -116,7 +116,7 @@ def transform(self) -> DataFrame:
.withColumn("Value", map_values("body"))
.select(
map_from_arrays("TagName", "Value").alias("x"),
col("x.timeStamp").alias("EventTime"),
to_timestamp(col("x.timeStamp")).alias("EventTime"),
col("x.siteName").alias("SiteName"),
)
.select("EventTime", "SiteName", posexplode("x"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
_package_version_meets_minimum,
)

EVENTTIME = "2023-11-03T16:21:16"
EVENTTIME = datetime.fromisoformat("2023-11-03T16:21:16")


def test_mirico_json_to_pcdm(spark_session: SparkSession):
Expand All @@ -40,7 +40,7 @@ def test_mirico_json_to_pcdm(spark_session: SparkSession):

expected_schema = StructType(
[
StructField("EventTime", StringType(), True),
StructField("EventTime", TimestampType(), True),
StructField("TagName", StringType(), False),
StructField("Status", StringType(), False),
StructField("Value", StringType(), True),
Expand Down

0 comments on commit d27b768

Please sign in to comment.