diff --git a/docs/sdk/code-reference/pipelines/transformers/spark/iso/caiso_to_mdm.md b/docs/sdk/code-reference/pipelines/transformers/spark/iso/caiso_to_mdm.md new file mode 100644 index 000000000..aa43a4e1a --- /dev/null +++ b/docs/sdk/code-reference/pipelines/transformers/spark/iso/caiso_to_mdm.md @@ -0,0 +1 @@ +::: src.sdk.python.rtdip_sdk.pipelines.transformers.spark.iso.caiso_to_mdm diff --git a/docs/sdk/pipelines/components.md b/docs/sdk/pipelines/components.md index 4d06716ff..93d0b3645 100644 --- a/docs/sdk/pipelines/components.md +++ b/docs/sdk/pipelines/components.md @@ -70,6 +70,7 @@ Transformers are components that perform transformations on data. These will tar |[Raw Forecast to Weather Data Model](../code-reference/pipelines/transformers/spark/the_weather_company/raw_forecast_to_weather_data_model.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| |[PJM To Meters Data Model](../code-reference/pipelines/transformers/spark/iso/pjm_to_mdm.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| |[ERCOT To Meters Data Model](../code-reference/pipelines/transformers/spark/iso/ercot_to_mdm.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| +|[CAISO To Meters Data Model](../code-reference/pipelines/transformers/spark/iso/caiso_to_mdm.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:| !!! note "Note" This list will dynamically change as the framework is further developed and new components are added. diff --git a/mkdocs.yml b/mkdocs.yml index 13d101dbe..c3f483c67 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -203,6 +203,7 @@ nav: - MISO To Meters Data Model: sdk/code-reference/pipelines/transformers/spark/iso/miso_to_mdm.md - PJM To Meters Data Model: sdk/code-reference/pipelines/transformers/spark/iso/pjm_to_mdm.md - ERCOT To Meters Data Model: sdk/code-reference/pipelines/transformers/spark/iso/ercot_to_mdm.md + - CAISO To Meters Data Model: sdk/code-reference/pipelines/transformers/spark/iso/caiso_to_mdm.md - The Weather Company: - Raw Forecast To Weather Data Model: sdk/code-reference/pipelines/transformers/spark/the_weather_company/raw_forecast_to_weather_data_model.md - ECMWF: diff --git a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py index a9924aabe..5bd278e4b 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py +++ b/src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py @@ -610,27 +610,3 @@ def get_dbutils( StructField("sourceName", StringType(), True), ] ) - -MIRICO_SCHEMA = StructType( - [ - StructField("retroName", StringType(), True), - StructField("temperatureKelvin", FloatType(), True), - StructField("siteName", StringType(), True), - StructField("pressureMillibar", FloatType(), True), - StructField("windSpeedMetersPerSecond", FloatType(), True), - StructField("windBearingDegreesTo", FloatType(), True), - StructField("pathLengthMeters", FloatType(), True), - StructField("retroAltitude", FloatType(), True), - StructField("sensorAltitude", FloatType(), True), - StructField("quality", IntegerType(), True), - StructField("timeStamp", StringType(), True), - StructField("siteKey", StringType(), True), - StructField("gasTypeId", IntegerType(), True), - StructField("retroLongitude", FloatType(), True), - StructField("gasType", StringType(), True), - StructField("sensorLatitude", FloatType(), True), - StructField("gasPpm", FloatType(), True), - StructField("retroLatitude", FloatType(), True), - StructField("sensorLongitude", FloatType(), True), - ] -) diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/__init__.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/__init__.py index 583a5bd99..f2a1dd5e4 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/__init__.py +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/__init__.py @@ -1,3 +1,4 @@ +from .caiso_to_mdm import CAISOToMDMTransformer from .ercot_to_mdm import ERCOTToMDMTransformer from .miso_to_mdm import MISOToMDMTransformer from .pjm_to_mdm import PJMToMDMTransformer diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/caiso_to_mdm.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/caiso_to_mdm.py new file mode 100644 index 000000000..ff16b2775 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/caiso_to_mdm.py @@ -0,0 +1,72 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pyspark.sql import DataFrame, SparkSession + +from ..base_raw_to_mdm import BaseRawToMDMTransformer +from ...._pipeline_utils.iso import CAISO_SCHEMA +from .....data_models.timeseries import SeriesType, ModelType, ValueType + + +class CAISOToMDMTransformer(BaseRawToMDMTransformer): + """ + Converts CAISO Raw data into Meters Data Model. + + Please check the BaseRawToMDMTransformer for the required arguments and methods. + + Example + -------- + ```python + from rtdip_sdk.pipelines.transformers import CAISOToMDMTransformer + from rtdip_sdk.pipelines.utilities import SparkSessionUtility + + # Not required if using Databricks + spark = SparkSessionUtility(config={}).execute() + + caiso_to_mdm_transformer = CAISOToMDMTransformer( + spark=spark, + data=df, + output_type="usage", + name=None, + description=None, + value_type=None, + version=None, + series_id=None, + series_parent_id=None + ) + + result = caiso_to_mdm_transformer.transform() + ``` + """ + + spark: SparkSession + data: DataFrame + input_schema = CAISO_SCHEMA + uid_col = "TacAreaName" + series_id_col = "'series_std_001'" + timestamp_col = "to_timestamp(StartTime)" + interval_timestamp_col = "Timestamp + INTERVAL 1 HOURS" + value_col = "Load" + series_parent_id_col = "'series_parent_std_001'" + name_col = "'CAISO API'" + uom_col = "'mwh'" + description_col = "'CAISO data pulled from CAISO ISO API'" + timestamp_start_col = "StartTime" + timestamp_end_col = "StartTime + INTERVAL 1 HOURS" + time_zone_col = "'PST'" + version_col = "'1'" + series_type = SeriesType.Hour + model_type = ModelType.Default + value_type = ValueType.Usage + properties_col = "null" diff --git a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py index 9a5751970..7d77afd15 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py +++ b/src/sdk/python/rtdip_sdk/pipelines/transformers/spark/mirico_json_to_pcdm.py @@ -23,13 +23,13 @@ map_keys, map_values, concat_ws, + to_timestamp, ) from ...._sdk_utils.compare_versions import ( _package_version_meets_minimum, ) from ..interfaces import TransformerInterface from ..._pipeline_utils.models import Libraries, SystemType -from ..._pipeline_utils.spark import SEM_SCHEMA from ..._pipeline_utils import mirico_field_mappings @@ -116,7 +116,7 @@ def transform(self) -> DataFrame: .withColumn("Value", map_values("body")) .select( map_from_arrays("TagName", "Value").alias("x"), - col("x.timeStamp").alias("EventTime"), + to_timestamp(col("x.timeStamp")).alias("EventTime"), col("x.siteName").alias("SiteName"), ) .select("EventTime", "SiteName", posexplode("x")) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_caiso_to_mdm.py b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_caiso_to_mdm.py new file mode 100644 index 000000000..5f7404adf --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_caiso_to_mdm.py @@ -0,0 +1,79 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from pyspark.sql import SparkSession, DataFrame + +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.iso import CAISO_SCHEMA +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.mdm import ( + MDM_USAGE_SCHEMA, + MDM_META_SCHEMA, +) +from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import ( + Libraries, + SystemType, +) +from src.sdk.python.rtdip_sdk.pipelines.transformers import CAISOToMDMTransformer + +parent_base_path: str = os.path.join( + os.path.dirname(os.path.realpath(__file__)), "test_data" +) + + +def caiso_to_mdm_test( + spark_session: SparkSession, + output_type: str, + expected_df: DataFrame, + base_path: str, +): + input_df: DataFrame = spark_session.read.csv( + f"{base_path}/input.csv", header=True, schema=CAISO_SCHEMA + ) + + transformer = CAISOToMDMTransformer( + spark_session, input_df, output_type=output_type + ) + assert transformer.system_type() == SystemType.PYSPARK + assert isinstance(transformer.libraries(), Libraries) + assert transformer.settings() == dict() + + actual_df = transformer.transform() + + cols = list( + filter( + lambda column: column in expected_df.columns, + ["Uid", "Timestamp", "TimestampStart"], + ) + ) + assert actual_df.orderBy(cols).collect() == expected_df.orderBy(cols).collect() + + +def test_caiso_to_mdm_usage(spark_session: SparkSession): + usage_path = os.path.join(parent_base_path, "caiso_usage") + usage_expected_df: DataFrame = spark_session.read.csv( + f"{usage_path}/output.csv", header=True, schema=MDM_USAGE_SCHEMA + ) + + caiso_to_mdm_test(spark_session, "usage", usage_expected_df, usage_path) + + +def test_caiso_to_mdm_meta(spark_session: SparkSession): + meta_path: str = os.path.join(parent_base_path, "caiso_meta") + + meta_expected_df: DataFrame = spark_session.read.json( + f"{meta_path}/output.json", schema=MDM_META_SCHEMA + ) + + caiso_to_mdm_test(spark_session, "meta", meta_expected_df, meta_path) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_data/caiso_meta/input.csv b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_data/caiso_meta/input.csv new file mode 100644 index 000000000..d936f42f1 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_data/caiso_meta/input.csv @@ -0,0 +1,5 @@ +StartTime,EndTime,LoadType,OprDt,OprHr,OprInterval,MarketRunId,TacAreaName,Label,XmlDataItem,Pos,Load,ExecutionType,Group +2023-12-13T07:00:00,2023-12-13T08:00:00,0,2023-12-12,24,0,ACTUAL,SDGE-TAC,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.4,2077.0,ACTUAL,95 +2023-12-13T01:00:00,2023-12-13T02:00:00,0,2023-12-12,18,0,ACTUAL,SDGE-TAC,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.4,2758.0,ACTUAL,95 +2023-12-13T07:00:00,2023-12-13T08:00:00,0,2023-12-12,24,0,ACTUAL,SRP,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.8,2900.0,ACTUAL,96 +2023-12-13T03:00:00,2023-12-13T04:00:00,0,2023-12-12,20,0,ACTUAL,SRP,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.8,3342.0,ACTUAL,96 diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_data/caiso_meta/output.json b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_data/caiso_meta/output.json new file mode 100644 index 000000000..1e37e1c04 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_data/caiso_meta/output.json @@ -0,0 +1,4 @@ +{"Uid":"SDGE-TAC","SeriesId":"series_std_001","SeriesParentId":"series_parent_std_001","Name":"CAISO API","Uom":"mwh","Description":"CAISO data pulled from CAISO ISO API","TimestampStart":"2023-12-13T07:00:00","TimestampEnd":"2023-12-13T08:00:00","Timezone":"PST","Version":"1","SeriesType":64,"ModelType":1,"ValueType":16} +{"Uid":"SDGE-TAC","SeriesId":"series_std_001","SeriesParentId":"series_parent_std_001","Name":"CAISO API","Uom":"mwh","Description":"CAISO data pulled from CAISO ISO API","TimestampStart":"2023-12-13T01:00:00","TimestampEnd":"2023-12-13T02:00:00","Timezone":"PST","Version":"1","SeriesType":64,"ModelType":1,"ValueType":16} +{"Uid":"SRP","SeriesId":"series_std_001","SeriesParentId":"series_parent_std_001","Name":"CAISO API","Uom":"mwh","Description":"CAISO data pulled from CAISO ISO API","TimestampStart":"2023-12-13T07:00:00","TimestampEnd":"2023-12-13T08:00:00","Timezone":"PST","Version":"1","SeriesType":64,"ModelType":1,"ValueType":16} +{"Uid":"SRP","SeriesId":"series_std_001","SeriesParentId":"series_parent_std_001","Name":"CAISO API","Uom":"mwh","Description":"CAISO data pulled from CAISO ISO API","TimestampStart":"2023-12-13T03:00:00","TimestampEnd":"2023-12-13T04:00:00","Timezone":"PST","Version":"1","SeriesType":64,"ModelType":1,"ValueType":16} diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_data/caiso_usage/input.csv b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_data/caiso_usage/input.csv new file mode 100644 index 000000000..d936f42f1 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_data/caiso_usage/input.csv @@ -0,0 +1,5 @@ +StartTime,EndTime,LoadType,OprDt,OprHr,OprInterval,MarketRunId,TacAreaName,Label,XmlDataItem,Pos,Load,ExecutionType,Group +2023-12-13T07:00:00,2023-12-13T08:00:00,0,2023-12-12,24,0,ACTUAL,SDGE-TAC,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.4,2077.0,ACTUAL,95 +2023-12-13T01:00:00,2023-12-13T02:00:00,0,2023-12-12,18,0,ACTUAL,SDGE-TAC,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.4,2758.0,ACTUAL,95 +2023-12-13T07:00:00,2023-12-13T08:00:00,0,2023-12-12,24,0,ACTUAL,SRP,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.8,2900.0,ACTUAL,96 +2023-12-13T03:00:00,2023-12-13T04:00:00,0,2023-12-12,20,0,ACTUAL,SRP,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.8,3342.0,ACTUAL,96 diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_data/caiso_usage/output.csv b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_data/caiso_usage/output.csv new file mode 100644 index 000000000..1d3005952 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/iso/test_data/caiso_usage/output.csv @@ -0,0 +1,5 @@ +Uid,SeriesId,Timestamp,IntervalTimestamp,Value +SDGE-TAC,series_std_001,2023-12-13T07:00:00,2023-12-13T08:00:00,2077.0 +SDGE-TAC,series_std_001,2023-12-13T01:00:00,2023-12-13T02:00:00,2758.0 +SRP,series_std_001,2023-12-13T07:00:00,2023-12-13T08:00:00,2900.0 +SRP,series_std_001,2023-12-13T03:00:00,2023-12-13T04:00:00,3342.0 diff --git a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py index 4926d6793..645268c60 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/transformers/spark/test_mirico_json_to_pcdm.py @@ -31,7 +31,7 @@ _package_version_meets_minimum, ) -EVENTTIME = "2023-11-03T16:21:16" +EVENTTIME = datetime.fromisoformat("2023-11-03T16:21:16") def test_mirico_json_to_pcdm(spark_session: SparkSession): @@ -40,7 +40,7 @@ def test_mirico_json_to_pcdm(spark_session: SparkSession): expected_schema = StructType( [ - StructField("EventTime", StringType(), True), + StructField("EventTime", TimestampType(), True), StructField("TagName", StringType(), False), StructField("Status", StringType(), False), StructField("Value", StringType(), True),