Skip to content

Commit

Permalink
Source connector for ERCOT System Level Data (#583)
Browse files Browse the repository at this point in the history
* Adding ERCOT Daily Load Source

Signed-off-by: Shivam Saxena <[email protected]>

* Fixing spaces in components.md

Signed-off-by: Shivam Saxena <[email protected]>

* Fixing spaces in mkdocs.md

Signed-off-by: Shivam Saxena <[email protected]>

* Fixing Pandas Compatibility

Signed-off-by: Shivam Saxena <[email protected]>

* Putting HTML Raw response in the test file for ERCOT

Signed-off-by: Shivam Saxena <[email protected]>

* Fixing Sonar Code Smells

Signed-off-by: Shivam Saxena <[email protected]>

* Code cleanups

Signed-off-by: Shivam Saxena <[email protected]>

---------

Signed-off-by: Shivam Saxena <[email protected]>
  • Loading branch information
IW-SS authored Dec 6, 2023
1 parent 285e112 commit dc4aec6
Show file tree
Hide file tree
Showing 14 changed files with 1,724 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.ercot_daily_load_iso
1 change: 1 addition & 0 deletions docs/sdk/pipelines/components.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Sources are components that connect to source systems and extract data from them
|[PJM Historical Load ISO](../code-reference/pipelines/sources/spark/iso/pjm_historical_load_iso.md) ||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[CAISO Daily Load ISO](../code-reference/pipelines/sources/spark/iso/caiso_daily_load_iso.md) ||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[CAISO Historical Load ISO](../code-reference/pipelines/sources/spark/iso/caiso_historical_load_iso.md) ||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[ERCOT Daily Load ISO](../code-reference/pipelines/sources/spark/iso/ercot_daily_load_iso.md) ||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[Weather Forecast API V1](../code-reference/pipelines/sources/spark/the_weather_company/weather_forecast_api_v1.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
|[Weather Forecast API V1 Multi](../code-reference/pipelines/sources/spark/the_weather_company/weather_forecast_api_v1_multi.md)||:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|:heavy_check_mark:|
!!! note "Note"
Expand Down
5 changes: 3 additions & 2 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ nav:
- PJM Historical Load: sdk/code-reference/pipelines/sources/spark/iso/pjm_historical_load_iso.md
- CAISO Daily Load: sdk/code-reference/pipelines/sources/spark/iso/caiso_daily_load_iso.md
- CAISO Historical Load: sdk/code-reference/pipelines/sources/spark/iso/caiso_historical_load_iso.md
- ERCOT Daily Load: sdk/code-reference/pipelines/sources/spark/iso/ercot_daily_load_iso.md
- Weather:
- Base Weather: sdk/code-reference/pipelines/sources/spark/the_weather_company/base_weather.md
- Weather Forecast API V1: sdk/code-reference/pipelines/sources/spark/the_weather_company/weather_forecast_api_v1.md
Expand Down Expand Up @@ -243,10 +244,10 @@ nav:
- Queries:
- Functions:
- Metadata: sdk/code-reference/query/functions/metadata.md
- Time Series:
- Time Series:
- Time Series Query Builder: sdk/code-reference/query/functions/time_series/time_series_query_builder.md
- Raw: sdk/code-reference/query/functions/time_series/raw.md
- Latest: sdk/code-reference/query/functions/time_series/latest.md
- Latest: sdk/code-reference/query/functions/time_series/latest.md
- Resample: sdk/code-reference/query/functions/time_series/resample.md
- Interpolate: sdk/code-reference/query/functions/time_series/interpolate.md
- Interpolation at Time: sdk/code-reference/query/functions/time_series/interpolation-at-time.md
Expand Down
17 changes: 17 additions & 0 deletions src/sdk/python/rtdip_sdk/pipelines/_pipeline_utils/iso.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,23 @@
]
)

ERCOT_SCHEMA = StructType(
[
StructField("Date", TimestampType(), True),
StructField("HourEnding", StringType(), True),
StructField("Coast", DoubleType(), True),
StructField("East", DoubleType(), True),
StructField("FarWest", DoubleType(), True),
StructField("North", DoubleType(), True),
StructField("NorthCentral", DoubleType(), True),
StructField("SouthCentral", DoubleType(), True),
StructField("Southern", DoubleType(), True),
StructField("West", DoubleType(), True),
StructField("SystemTotal", DoubleType(), True),
StructField("DstFlag", StringType(), True),
]
)


def melt(
df: DataFrame,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os
from tempfile import NamedTemporaryFile


class TempCertFiles(object):
"""
Allows to generate temporary certificate files and makes the files available for requests module to use.
"""

def __init__(self, cert, key):
self.cert = cert
self.key = key

def __enter__(self):
with NamedTemporaryFile(mode="wb", delete=False) as cert_file:
cert_file.write(self.cert)
self.cert_file_name = cert_file.name

with NamedTemporaryFile(mode="wb", delete=False) as key_file:
key_file.write(self.key)
self.key_file_name = key_file.name

return self.cert_file_name, self.key_file_name

def __exit__(self, exc_type, exc_value, traceback):
os.remove(self.cert_file_name)
os.remove(self.key_file_name)
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .base_iso import BaseISOSource
from .ercot_daily_load_iso import ERCOTDailyLoadISOSource
from .miso_daily_load_iso import MISODailyLoadISOSource
from .miso_historical_load_iso import MISOHistoricalLoadISOSource
from .pjm_daily_load_iso import PJMDailyLoadISOSource
Expand Down
11 changes: 5 additions & 6 deletions src/sdk/python/rtdip_sdk/pipelines/sources/spark/iso/base_iso.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@
# limitations under the License.

import logging
import pandas as pd
from py4j.protocol import Py4JJavaError
from pyspark.sql import DataFrame, SparkSession
import requests
from datetime import datetime, timezone
import pytz
from io import BytesIO

import pandas as pd
import pytz
import requests
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType
from requests import HTTPError
from io import BytesIO

from ...interfaces import SourceInterface
from ...._pipeline_utils.models import Libraries, SystemType
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import logging
from _datetime import datetime, timedelta
from io import BytesIO
from typing import List
from zipfile import ZipFile

import pandas as pd
import requests
from bs4 import BeautifulSoup
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import pkcs12
from pyspark.sql import SparkSession
from requests import HTTPError

from . import BaseISOSource
from ...._pipeline_utils.iso import ERCOT_SCHEMA
from ...._pipeline_utils.temp_cert_files import TempCertFiles


class ERCOTDailyLoadISOSource(BaseISOSource):
"""
The ERCOT Daily Load ISO Source is used to read daily load data from ERCOT using WebScrapping.
It supports actual and forecast data.
<br>API: <a href="https://mis.ercot.com">https://mis.ercot.com</a>
Parameters:
spark (SparkSession): Spark Session instance
options (dict): A dictionary of ISO Source specific configurations (See Attributes table below)
Attributes:
load_type (list): Must be one of `actual` or `forecast`.
date (str): Must be in `YYYY-MM-DD` format.
certificate_pfx_key (str): The certificate key data or password received from ERCOT.
certificate_pfx_key_contents (str): The certificate data received from ERCOT, it could be base64 encoded.
Please check the BaseISOSource for available methods.
BaseISOSource:
::: src.sdk.python.rtdip_sdk.pipelines.sources.spark.iso.base_iso
"""

spark: SparkSession
options: dict
url_forecast: str = "https://mis.ercot.com/misapp/GetReports.do?reportTypeId=12312"
url_actual: str = "https://mis.ercot.com/misapp/GetReports.do?reportTypeId=13101"
url_prefix: str = "https://mis.ercot.com"
query_datetime_format: str = "%Y-%m-%d"
required_options = [
"load_type",
"date",
"certificate_pfx_key",
"certificate_pfx_key_contents",
]
spark_schema = ERCOT_SCHEMA
default_query_timezone = "UTC"

def __init__(self, spark: SparkSession, options: dict) -> None:
super().__init__(spark, options)
self.spark = spark
self.options = options
self.load_type = self.options.get("load_type", "actual")
self.date = self.options.get("date", "").strip()
self.certificate_pfx_key = self.options.get("certificate_pfx_key", "").strip()
self.certificate_pfx_key_contents = self.options.get(
"certificate_pfx_key_contents", ""
).strip()

def generate_temp_client_cert_files_from_pfx(self):
password = self.certificate_pfx_key.encode()
pfx: bytes = base64.b64decode(self.certificate_pfx_key_contents)

if base64.b64encode(pfx) != self.certificate_pfx_key_contents.encode():
pfx = self.certificate_pfx_key_contents

key, cert, _ = pkcs12.load_key_and_certificates(data=pfx, password=password)
key_bytes = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)

cert_bytes = cert.public_bytes(encoding=serialization.Encoding.PEM)
return TempCertFiles(cert_bytes, key_bytes)

def _pull_data(self) -> pd.DataFrame:
"""
Pulls data from the ERCOT API and parses the zip files for CSV data.
Returns:
Raw form of data.
"""

logging.info(f"Getting {self.load_type} data for date {self.date}")
url = self.url_forecast
req_date = datetime.strptime(self.date, self.query_datetime_format)

if self.load_type == "actual":
req_date = req_date + timedelta(days=1)
url = self.url_actual

url_lists, files = self.generate_urls_for_zip(url, req_date)
dfs = []
logging.info(f"Generated {len(url_lists)} URLs - {url_lists}")
logging.info(f"Requesting files - {files}")

for url in url_lists:
df = self.download_zip(url)
dfs.append(df)
final_df = pd.concat(dfs)
return final_df

def download_zip(self, url) -> pd.DataFrame:
logging.info(f"Downloading zip using {url}")
with self.generate_temp_client_cert_files_from_pfx() as cert:
response = requests.get(url, cert=cert)

if not response.content:
raise HTTPError("Empty Response was returned")

logging.info("Unzipping the file")
zf = ZipFile(BytesIO(response.content))
csvs = [s for s in zf.namelist() if ".csv" in s]

if len(csvs) == 0:
raise ValueError("No data was found in the specified interval")

df = pd.read_csv(zf.open(csvs[0]))
return df

def generate_urls_for_zip(self, url: str, date: datetime) -> (List[str], List[str]):
logging.info(f"Finding urls list for date {date}")
with self.generate_temp_client_cert_files_from_pfx() as cert:
page_response = requests.get(url, timeout=5, cert=cert)

page_content = BeautifulSoup(page_response.content, "html.parser")
zip_info = []
length = len(page_content.find_all("td", {"class": "labelOptional_ind"}))

for i in range(0, length):
zip_name = page_content.find_all("td", {"class": "labelOptional_ind"})[
i
].text
zip_link = page_content.find_all("a")[i].get("href")
zip_info.append((zip_name, zip_link))

date_str = date.strftime("%Y%m%d")
zip_info = list(
filter(
lambda f_info: f_info[0].endswith("csv.zip") and date_str in f_info[0],
zip_info,
)
)

urls = []
files = []

if len(zip_info) == 0:
raise ValueError(f"No file was found for date - {date_str}")

# As Forecast is generated every hour, pick the latest one.
zip_info = sorted(zip_info, key=lambda item: item[0], reverse=True)
zip_info_item = zip_info[0]

file_name, file_url = zip_info_item
urls.append(self.url_prefix + file_url)
files.append(file_name)

return urls, files

def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:
if self.load_type == "actual":
df["Date"] = pd.to_datetime(df["OperDay"], format="%m/%d/%Y")

df = df.rename(
columns={
"COAST": "Coast",
"EAST": "East",
"FAR_WEST": "FarWest",
"NORTH": "North",
"NORTH_C": "NorthCentral",
"SOUTH_C": "SouthCentral",
"SOUTHERN": "Southern",
"WEST": "West",
"TOTAL": "SystemTotal",
"DSTFlag": "DstFlag",
}
)

else:
df = df.rename(columns={"DSTFlag": "DstFlag"})

df["Date"] = pd.to_datetime(df["DeliveryDate"], format="%m/%d/%Y")

return df

def _validate_options(self) -> bool:
try:
datetime.strptime(self.date, self.query_datetime_format)
except ValueError:
raise ValueError(
f"Unable to parse date. Please specify in {self.query_datetime_format} format."
)
return True
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Date,HourEnding,Coast,East,FarWest,North,NorthCentral,SouthCentral,Southern,West,SystemTotal,DstFlag
2023-11-17T00:00:00,01:00,10135.49,1336.0,5757.36,989.29,10279.03,6031.93,2880.14,1238.34,38647.6,N
2023-11-17T00:00:00,02:00,9909.41,1293.77,5710.46,955.6,9907.13,5816.07,2794.81,1223.4,37610.65,N
2023-11-17T00:00:00,03:00,9771.6,1262.84,5715.07,953.4,9715.77,5733.25,2729.48,1184.61,37066.03,N
2023-11-17T00:00:00,04:00,9766.79,1261.93,5717.74,1016.36,9655.56,5707.14,2689.63,1195.55,37010.7,N
2023-11-17T00:00:00,05:00,9915.49,1266.05,5701.25,1080.7,9809.29,5825.19,2779.05,1204.86,37581.87,N
2023-11-17T00:00:00,06:00,10344.26,1351.29,5725.32,1036.34,10405.41,6122.29,2885.79,1269.09,39139.79,N
2023-11-17T00:00:00,07:00,10957.81,1463.42,5749.53,1113.97,11399.31,6698.35,3099.21,1329.38,41810.99,N
2023-11-17T00:00:00,08:00,11183.16,1489.56,5778.87,1143.22,11972.83,6973.01,3167.29,1345.47,43053.4,N
2023-11-17T00:00:00,09:00,11340.07,1508.57,6099.96,1282.17,12170.81,7041.57,3207.91,1336.44,43987.49,N
2023-11-17T00:00:00,10:00,11561.22,1342.13,6213.73,1315.28,12532.28,7008.29,3262.59,1378.89,44614.4,N
2023-11-17T00:00:00,11:00,11780.2,1468.34,6055.92,1310.07,12490.92,6890.16,3260.92,1395.85,44652.37,N
2023-11-17T00:00:00,12:00,12043.4,1581.1,6037.42,1248.41,12471.1,6820.85,3438.22,1348.85,44989.35,N
2023-11-17T00:00:00,13:00,12273.49,1611.23,6080.47,1285.75,12439.01,6855.05,3600.8,1316.79,45462.59,N
2023-11-17T00:00:00,14:00,12644.82,1651.5,6091.49,1287.25,12396.43,7079.92,3808.21,1299.69,46259.31,N
2023-11-17T00:00:00,15:00,12878.62,1626.31,6120.06,1288.4,12317.83,7351.21,3990.87,1316.69,46889.99,N
2023-11-17T00:00:00,16:00,13068.31,1616.88,6155.71,1262.62,12282.92,7631.46,4150.48,1346.38,47514.75,N
2023-11-17T00:00:00,17:00,13039.47,1643.57,6107.02,1238.44,12309.79,7739.6,4000.13,1290.92,47368.94,N
2023-11-17T00:00:00,18:00,12789.54,1670.93,5990.18,1240.34,12653.03,7787.84,3890.75,1269.9,47292.5,N
2023-11-17T00:00:00,19:00,12643.52,1667.72,6133.1,1279.62,12730.57,7832.6,3859.42,1327.01,47473.55,N
2023-11-17T00:00:00,20:00,12286.56,1634.69,6186.14,1275.64,12420.21,7647.22,3676.05,1348.56,46475.08,N
2023-11-17T00:00:00,21:00,11991.18,1576.42,6232.16,1272.85,12179.04,7385.18,3476.33,1274.84,45387.99,N
2023-11-17T00:00:00,22:00,11699.47,1534.36,6193.88,1254.31,11774.59,7141.54,3485.02,1256.44,44339.61,N
2023-11-17T00:00:00,23:00,11283.57,1451.22,6114.48,1216.52,11260.09,6778.77,3365.74,1218.28,42688.68,N
2023-11-17T00:00:00,24:00,10822.67,1382.91,6054.4,1176.03,10624.14,6362.47,3221.17,1139.7,40783.49,N
Binary file not shown.
Binary file not shown.
Loading

0 comments on commit dc4aec6

Please sign in to comment.