Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.9.8 #630

Merged
merged 2 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: src.sdk.python.rtdip_sdk.pipelines.transformers.spark.iso.caiso_to_mdm
1 change: 1 addition & 0 deletions docs/sdk/pipelines/components.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Transformers are components that perform transformations on data. These will tar
|[Raw Forecast to Weather Data Model](../code-reference/pipelines/transformers/spark/the_weather_company/raw_forecast_to_weather_data_model.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[PJM To Meters Data Model](../code-reference/pipelines/transformers/spark/iso/pjm_to_mdm.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[ERCOT To Meters Data Model](../code-reference/pipelines/transformers/spark/iso/ercot_to_mdm.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[CAISO To Meters Data Model](../code-reference/pipelines/transformers/spark/iso/caiso_to_mdm.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|

!!! note "Note"
This list will dynamically change as the framework is further developed and new components are added.
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ nav:
- MISO To Meters Data Model: sdk/code-reference/pipelines/transformers/spark/iso/miso_to_mdm.md
- PJM To Meters Data Model: sdk/code-reference/pipelines/transformers/spark/iso/pjm_to_mdm.md
- ERCOT To Meters Data Model: sdk/code-reference/pipelines/transformers/spark/iso/ercot_to_mdm.md
- CAISO To Meters Data Model: sdk/code-reference/pipelines/transformers/spark/iso/caiso_to_mdm.md
- The Weather Company:
- Raw Forecast To Weather Data Model: sdk/code-reference/pipelines/transformers/spark/the_weather_company/raw_forecast_to_weather_data_model.md
- ECMWF:
Expand Down
24 changes: 0 additions & 24 deletions src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,27 +610,3 @@ def get_dbutils(
StructField("sourceName", StringType(), True),
]
)

MIRICO_SCHEMA = StructType(
[
StructField("retroName", StringType(), True),
StructField("temperatureKelvin", FloatType(), True),
StructField("siteName", StringType(), True),
StructField("pressureMillibar", FloatType(), True),
StructField("windSpeedMetersPerSecond", FloatType(), True),
StructField("windBearingDegreesTo", FloatType(), True),
StructField("pathLengthMeters", FloatType(), True),
StructField("retroAltitude", FloatType(), True),
StructField("sensorAltitude", FloatType(), True),
StructField("quality", IntegerType(), True),
StructField("timeStamp", StringType(), True),
StructField("siteKey", StringType(), True),
StructField("gasTypeId", IntegerType(), True),
StructField("retroLongitude", FloatType(), True),
StructField("gasType", StringType(), True),
StructField("sensorLatitude", FloatType(), True),
StructField("gasPpm", FloatType(), True),
StructField("retroLatitude", FloatType(), True),
StructField("sensorLongitude", FloatType(), True),
]
)
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .caiso_to_mdm import CAISOToMDMTransformer
from .ercot_to_mdm import ERCOTToMDMTransformer
from .miso_to_mdm import MISOToMDMTransformer
from .pjm_to_mdm import PJMToMDMTransformer
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pyspark.sql import DataFrame, SparkSession

from ..base_raw_to_mdm import BaseRawToMDMTransformer
from ...._pipeline_utils.iso import CAISO_SCHEMA
from .....data_models.timeseries import SeriesType, ModelType, ValueType


class CAISOToMDMTransformer(BaseRawToMDMTransformer):
"""
Converts CAISO Raw data into Meters Data Model.

Please check the BaseRawToMDMTransformer for the required arguments and methods.

Example
--------
```python
from rtdip_sdk.pipelines.transformers import CAISOToMDMTransformer
from rtdip_sdk.pipelines.utilities import SparkSessionUtility

# Not required if using Databricks
spark = SparkSessionUtility(config={}).execute()

caiso_to_mdm_transformer = CAISOToMDMTransformer(
spark=spark,
data=df,
output_type="usage",
name=None,
description=None,
value_type=None,
version=None,
series_id=None,
series_parent_id=None
)

result = caiso_to_mdm_transformer.transform()
```
"""

spark: SparkSession
data: DataFrame
input_schema = CAISO_SCHEMA
uid_col = "TacAreaName"
series_id_col = "'series_std_001'"
timestamp_col = "to_timestamp(StartTime)"
interval_timestamp_col = "Timestamp + INTERVAL 1 HOURS"
value_col = "Load"
series_parent_id_col = "'series_parent_std_001'"
name_col = "'CAISO API'"
uom_col = "'mwh'"
description_col = "'CAISO data pulled from CAISO ISO API'"
timestamp_start_col = "StartTime"
timestamp_end_col = "StartTime + INTERVAL 1 HOURS"
time_zone_col = "'PST'"
version_col = "'1'"
series_type = SeriesType.Hour
model_type = ModelType.Default
value_type = ValueType.Usage
properties_col = "null"
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
map_keys,
map_values,
concat_ws,
to_timestamp,
)
from ...._sdk_utils.compare_versions import (
_package_version_meets_minimum,
)
from ..interfaces import TransformerInterface
from ..._pipeline_utils.models import Libraries, SystemType
from ..._pipeline_utils.spark import SEM_SCHEMA
from ..._pipeline_utils import mirico_field_mappings


Expand Down Expand Up @@ -116,7 +116,7 @@ def transform(self) -> DataFrame:
.withColumn("Value", map_values("body"))
.select(
map_from_arrays("TagName", "Value").alias("x"),
col("x.timeStamp").alias("EventTime"),
to_timestamp(col("x.timeStamp")).alias("EventTime"),
col("x.siteName").alias("SiteName"),
)
.select("EventTime", "SiteName", posexplode("x"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

from pyspark.sql import SparkSession, DataFrame

from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.iso import CAISO_SCHEMA
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.mdm import (
MDM_USAGE_SCHEMA,
MDM_META_SCHEMA,
)
from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import (
Libraries,
SystemType,
)
from src.sdk.python.rtdip_sdk.pipelines.transformers import CAISOToMDMTransformer

parent_base_path: str = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "test_data"
)


def caiso_to_mdm_test(
spark_session: SparkSession,
output_type: str,
expected_df: DataFrame,
base_path: str,
):
input_df: DataFrame = spark_session.read.csv(
f"{base_path}/input.csv", header=True, schema=CAISO_SCHEMA
)

transformer = CAISOToMDMTransformer(
spark_session, input_df, output_type=output_type
)
assert transformer.system_type() == SystemType.PYSPARK
assert isinstance(transformer.libraries(), Libraries)
assert transformer.settings() == dict()

actual_df = transformer.transform()

cols = list(
filter(
lambda column: column in expected_df.columns,
["Uid", "Timestamp", "TimestampStart"],
)
)
assert actual_df.orderBy(cols).collect() == expected_df.orderBy(cols).collect()


def test_caiso_to_mdm_usage(spark_session: SparkSession):
usage_path = os.path.join(parent_base_path, "caiso_usage")
usage_expected_df: DataFrame = spark_session.read.csv(
f"{usage_path}/output.csv", header=True, schema=MDM_USAGE_SCHEMA
)

caiso_to_mdm_test(spark_session, "usage", usage_expected_df, usage_path)


def test_caiso_to_mdm_meta(spark_session: SparkSession):
meta_path: str = os.path.join(parent_base_path, "caiso_meta")

meta_expected_df: DataFrame = spark_session.read.json(
f"{meta_path}/output.json", schema=MDM_META_SCHEMA
)

caiso_to_mdm_test(spark_session, "meta", meta_expected_df, meta_path)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
StartTime,EndTime,LoadType,OprDt,OprHr,OprInterval,MarketRunId,TacAreaName,Label,XmlDataItem,Pos,Load,ExecutionType,Group
2023-12-13T07:00:00,2023-12-13T08:00:00,0,2023-12-12,24,0,ACTUAL,SDGE-TAC,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.4,2077.0,ACTUAL,95
2023-12-13T01:00:00,2023-12-13T02:00:00,0,2023-12-12,18,0,ACTUAL,SDGE-TAC,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.4,2758.0,ACTUAL,95
2023-12-13T07:00:00,2023-12-13T08:00:00,0,2023-12-12,24,0,ACTUAL,SRP,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.8,2900.0,ACTUAL,96
2023-12-13T03:00:00,2023-12-13T04:00:00,0,2023-12-12,20,0,ACTUAL,SRP,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.8,3342.0,ACTUAL,96
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"Uid":"SDGE-TAC","SeriesId":"series_std_001","SeriesParentId":"series_parent_std_001","Name":"CAISO API","Uom":"mwh","Description":"CAISO data pulled from CAISO ISO API","TimestampStart":"2023-12-13T07:00:00","TimestampEnd":"2023-12-13T08:00:00","Timezone":"PST","Version":"1","SeriesType":64,"ModelType":1,"ValueType":16}
{"Uid":"SDGE-TAC","SeriesId":"series_std_001","SeriesParentId":"series_parent_std_001","Name":"CAISO API","Uom":"mwh","Description":"CAISO data pulled from CAISO ISO API","TimestampStart":"2023-12-13T01:00:00","TimestampEnd":"2023-12-13T02:00:00","Timezone":"PST","Version":"1","SeriesType":64,"ModelType":1,"ValueType":16}
{"Uid":"SRP","SeriesId":"series_std_001","SeriesParentId":"series_parent_std_001","Name":"CAISO API","Uom":"mwh","Description":"CAISO data pulled from CAISO ISO API","TimestampStart":"2023-12-13T07:00:00","TimestampEnd":"2023-12-13T08:00:00","Timezone":"PST","Version":"1","SeriesType":64,"ModelType":1,"ValueType":16}
{"Uid":"SRP","SeriesId":"series_std_001","SeriesParentId":"series_parent_std_001","Name":"CAISO API","Uom":"mwh","Description":"CAISO data pulled from CAISO ISO API","TimestampStart":"2023-12-13T03:00:00","TimestampEnd":"2023-12-13T04:00:00","Timezone":"PST","Version":"1","SeriesType":64,"ModelType":1,"ValueType":16}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
StartTime,EndTime,LoadType,OprDt,OprHr,OprInterval,MarketRunId,TacAreaName,Label,XmlDataItem,Pos,Load,ExecutionType,Group
2023-12-13T07:00:00,2023-12-13T08:00:00,0,2023-12-12,24,0,ACTUAL,SDGE-TAC,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.4,2077.0,ACTUAL,95
2023-12-13T01:00:00,2023-12-13T02:00:00,0,2023-12-12,18,0,ACTUAL,SDGE-TAC,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.4,2758.0,ACTUAL,95
2023-12-13T07:00:00,2023-12-13T08:00:00,0,2023-12-12,24,0,ACTUAL,SRP,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.8,2900.0,ACTUAL,96
2023-12-13T03:00:00,2023-12-13T04:00:00,0,2023-12-12,20,0,ACTUAL,SRP,Total Actual Hourly Integrated Load,SYS_FCST_ACT_MW,3.8,3342.0,ACTUAL,96
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Uid,SeriesId,Timestamp,IntervalTimestamp,Value
SDGE-TAC,series_std_001,2023-12-13T07:00:00,2023-12-13T08:00:00,2077.0
SDGE-TAC,series_std_001,2023-12-13T01:00:00,2023-12-13T02:00:00,2758.0
SRP,series_std_001,2023-12-13T07:00:00,2023-12-13T08:00:00,2900.0
SRP,series_std_001,2023-12-13T03:00:00,2023-12-13T04:00:00,3342.0
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
_package_version_meets_minimum,
)

EVENTTIME = "2023-11-03T16:21:16"
EVENTTIME = datetime.fromisoformat("2023-11-03T16:21:16")


def test_mirico_json_to_pcdm(spark_session: SparkSession):
Expand All @@ -40,7 +40,7 @@ def test_mirico_json_to_pcdm(spark_session: SparkSession):

expected_schema = StructType(
[
StructField("EventTime", StringType(), True),
StructField("EventTime", TimestampType(), True),
StructField("TagName", StringType(), False),
StructField("Status", StringType(), False),
StructField("Value", StringType(), True),
Expand Down
Loading