Skip to content

Commit

Permalink
Merge pull request #1 from gieljnssns/develop
Browse files Browse the repository at this point in the history
Add a csv prediction function
  • Loading branch information
gieljnssns authored Jan 18, 2024
2 parents 5691360 + 31ef823 commit a7355ce
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 3 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ test.py
secrets_emhass.yaml
*.tar
.vscode/*
**/app

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
43 changes: 43 additions & 0 deletions src/emhass/command_line.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from emhass.forecast import forecast
from emhass.machine_learning_forecaster import mlforecaster
from emhass.optimization import optimization
from emhass.csv_predictor import CsvPredictor
from emhass import utils


Expand Down Expand Up @@ -145,6 +146,12 @@ def set_input_data_dict(config_path: pathlib.Path, base_path: str, costfun: str,
var_list = [var_model]
rh.get_data(days_list, var_list)
df_input_data = rh.df_final.copy()
elif set_type == "csv-predict":
df_input_data, df_input_data_dayahead = None, None
P_PV_forecast, P_load_forecast = None, None
days_list = None
params = json.loads(params)

elif set_type == "publish-data":
df_input_data, df_input_data_dayahead = None, None
P_PV_forecast, P_load_forecast = None, None
Expand Down Expand Up @@ -423,6 +430,42 @@ def forecast_model_tune(input_data_dict: dict, logger: logging.Logger,
pickle.dump(mlf, outp, pickle.HIGHEST_PROTOCOL)
return df_pred_optim, mlf

def csv_predict(input_data_dict: dict, logger: logging.Logger,
debug: Optional[bool] = False) -> np.ndarray:
"""Perform a prediction from csv file.
:param input_data_dict: A dictionnary with multiple data used by the action functions
:type input_data_dict: dict
:param logger: The passed logger object
:type logger: logging.Logger
:param debug: True to debug, useful for unit testing, defaults to False
:type debug: Optional[bool], optional
:return: The np.ndarray containing the predicted value.
:rtype: np.ndarray
"""
csv_file = input_data_dict['params']['passed_data']['csv_file']
sklearn_model = input_data_dict['params']['passed_data']['sklearn_model']
independent_variables = input_data_dict['params']['passed_data']['independent_variables']
dependent_variable = input_data_dict['params']['passed_data']['dependent_variable']
new_values = input_data_dict['params']['passed_data']['new_values']
root = input_data_dict['root']
# The ML forecaster object
csv = CsvPredictor(csv_file, independent_variables, dependent_variable, sklearn_model, new_values, root, logger)
# Predict from csv file
prediction = csv.predict()

csv_predict_entity_id = input_data_dict['params']['passed_data']['csv_predict_entity_id']
csv_predict_unit_of_measurement = input_data_dict['params']['passed_data']['csv_predict_unit_of_measurement']
csv_predict_friendly_name = input_data_dict['params']['passed_data']['csv_predict_friendly_name']
# Publish prediction
idx = 0
input_data_dict['rh'].post_data(prediction, idx,
csv_predict_entity_id,
csv_predict_unit_of_measurement,
csv_predict_friendly_name,
type_var = 'csv_predictor')
return prediction

def publish_data(input_data_dict: dict, logger: logging.Logger,
save_data_to_file: Optional[bool] = False,
opt_res_latest: Optional[pd.DataFrame] = None) -> pd.DataFrame:
Expand Down
135 changes: 135 additions & 0 deletions src/emhass/csv_predictor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import logging
import pathlib
import time
from typing import Tuple
import pandas as pd
import numpy as np

from sklearn.linear_model import LinearRegression
from sklearn.linear_model import ElasticNet
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsRegressor

import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

class CsvPredictor:
r"""
A forecaster class using machine learning models.
This class uses the `skforecast` module and the machine learning models are from `scikit-learn`.
It exposes one main method:
- `predict`: to obtain a forecast from a csv file.
"""
def __init__(self, csv_file: str, independent_variables: list, dependent_variable: str, sklearn_model: str, new_values:list, root: str,
logger: logging.Logger) -> None:
r"""Define constructor for the forecast class.
:param csv_file: The name of the csv file to retrieve data from. \
Example: `prediction.csv`.
:type csv_file: str
:param independent_variables: A list of independent variables. \
Example: [`solar`, `degree_days`].
:type independent_variables: list
:param dependent_variable: The dependent variable(to be predicted). \
Example: `hours`.
:type dependent_variable: str
:param sklearn_model: The `scikit-learn` model that will be used. For now only \
this options are possible: `LinearRegression`, `ElasticNet` and `KNeighborsRegressor`.
:type sklearn_model: str
:param new_values: The new values for the independent variables(in the same order as the independent variables list). \
Example: [2.24, 5.68].
:type new_values: list
:param root: The parent folder of the path where the config.yaml file is located
:type root: str
:param logger: The passed logger object
:type logger: logging.Logger
"""
self.csv_file = csv_file
self.independent_variables = independent_variables
self.dependent_variable = dependent_variable
self.sklearn_model = sklearn_model
self.new_values = new_values
self.root = root
self.logger = logger
self.is_tuned = False


def load_data(self) -> pd.DataFrame:
"""Load the data."""
filename_path = pathlib.Path(self.root) / self.csv_file
if filename_path.is_file():
with open(filename_path, 'rb') as inp:
data = pd.read_csv(filename_path)
else:
self.logger.error("The cvs file was not found.")
raise ValueError(
f"The CSV file "+ self.csv_file +" was not found."
)

required_columns = self.independent_variables

if not set(required_columns).issubset(data.columns):
raise ValueError(
f"CSV file should contain the following columns: {', '.join(required_columns)}"
)
return data

def prepare_data(self, data) -> Tuple[np.ndarray, np.ndarray]:
"""
Prepare the data.
:param data: Input Data
:type data: pd.DataFrame
:return: A tuple containing the train data.
:rtype: Tuple[np.ndarray, np.ndarray]
"""
X = data[self.independent_variables].values
y = data[self.dependent_variable].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

return X_train, y_train


def predict(self) -> np.ndarray:
r"""The predict method to generate a forecast from a csv file.
:return: The np.ndarray containing the predicted value.
:rtype: np.ndarray
"""
self.logger.info("Performing a prediction for "+self.csv_file)
# Preparing the data: adding exogenous features
data = self.load_data()
if data is not None:
X, y = self.prepare_data(data)

if self.sklearn_model == 'LinearRegression':
base_model = LinearRegression()
elif self.sklearn_model == 'ElasticNet':
base_model = ElasticNet()
elif self.sklearn_model == 'KNeighborsRegressor':
base_model = KNeighborsRegressor()
else:
self.logger.error("Passed sklearn model "+self.sklearn_model+" is not valid")
# Define the forecaster object
self.forecaster = base_model
# Fit and time it
self.logger.info("Predict through a "+self.sklearn_model+" model")
start_time = time.time()
self.forecaster.fit(X, y)
self.logger.info(f"Elapsed time for model fit: {time.time() - start_time}")
new_values = np.array([self.new_values])
prediction = self.forecaster.predict(new_values)

return prediction




10 changes: 10 additions & 0 deletions src/emhass/retrieve_hass.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ def post_data(self, data_df: pd.DataFrame, idx: int, entity_id: str,
state = np.round(data_df.loc[data_df.index[idx]],4)
elif type_var == 'optim_status':
state = data_df.loc[data_df.index[idx]]
elif type_var == 'csv_predictor':
state = data_df[idx]
else:
state = np.round(data_df.loc[data_df.index[idx]],2)
if type_var == 'power':
Expand Down Expand Up @@ -315,6 +317,14 @@ def post_data(self, data_df: pd.DataFrame, idx: int, entity_id: str,
"friendly_name": friendly_name
}
}
elif type_var == 'csv_predictor':
data = {
"state": state,
"attributes": {
"unit_of_measurement": unit_of_measurement,
"friendly_name": friendly_name
}
}
else:
data = {
"state": "{:.2f}".format(state),
Expand Down
25 changes: 25 additions & 0 deletions src/emhass/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ def treat_runtimeparams(runtimeparams: str, params: str, retrieve_hass_conf: dic
freq = int(retrieve_hass_conf['freq'].seconds/60.0)
delta_forecast = int(optim_conf['delta_forecast'].days)
forecast_dates = get_forecast_dates(freq, delta_forecast)
if set_type == "csv-predict":
csv_file = runtimeparams['csv_file']
independent_variables = runtimeparams['independent_variables']
dependent_variable = runtimeparams['dependent_variable']
new_values = runtimeparams['new_values']
params['passed_data']['csv_file'] = csv_file
params['passed_data']['independent_variables'] = independent_variables
params['passed_data']['dependent_variable'] = dependent_variable
params['passed_data']['new_values'] = new_values

# Treating special data passed for MPC control case
if set_type == 'naive-mpc-optim':
if 'prediction_horizon' not in runtimeparams.keys():
Expand Down Expand Up @@ -304,6 +314,21 @@ def treat_runtimeparams(runtimeparams: str, params: str, retrieve_hass_conf: dic
else:
model_predict_friendly_name = runtimeparams['model_predict_friendly_name']
params['passed_data']['model_predict_friendly_name'] = model_predict_friendly_name
if 'csv_predict_entity_id' not in runtimeparams.keys():
csv_predict_entity_id = "sensor.csv_predictor"
else:
csv_predict_entity_id = runtimeparams['csv_predict_entity_id']
params['passed_data']['csv_predict_entity_id'] = csv_predict_entity_id
if 'csv_predict_unit_of_measurement' not in runtimeparams.keys():
csv_predict_unit_of_measurement = None
else:
csv_predict_unit_of_measurement = runtimeparams['csv_predict_unit_of_measurement']
params['passed_data']['csv_predict_unit_of_measurement'] = csv_predict_unit_of_measurement
if 'csv_predict_friendly_name' not in runtimeparams.keys():
csv_predict_friendly_name = "Csv predictor"
else:
csv_predict_friendly_name = runtimeparams['csv_predict_friendly_name']
params['passed_data']['csv_predict_friendly_name'] = csv_predict_friendly_name
# Treat optimization configuration parameters passed at runtime
if 'num_def_loads' in runtimeparams.keys():
optim_conf['num_def_loads'] = runtimeparams['num_def_loads']
Expand Down
15 changes: 12 additions & 3 deletions src/emhass/web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from emhass.command_line import set_input_data_dict
from emhass.command_line import perfect_forecast_optim, dayahead_forecast_optim, naive_mpc_optim
from emhass.command_line import forecast_model_fit, forecast_model_predict, forecast_model_tune
from emhass.command_line import csv_predict
from emhass.command_line import publish_data

# Define the Flask instance
Expand Down Expand Up @@ -239,6 +240,11 @@ def action_call(action_name):
pickle.dump(injection_dict, fid)
msg = f'EMHASS >> Action forecast-model-tune executed... \n'
return make_response(msg, 201)
elif action_name == 'csv-predict':
app.logger.info(" >> Performing a csv predict...")
csv_predict(input_data_dict, app.logger)
msg = f'EMHASS >> Action csv-predict executed... \n'
return make_response(msg, 201)
else:
app.logger.error("ERROR: passed action is not valid")
msg = f'EMHASS >> ERROR: Passed action is not valid... \n'
Expand Down Expand Up @@ -267,9 +273,11 @@ def action_call(action_name):
app.logger.error("options.json does not exists")
DATA_PATH = "/share/" #"/data/"
else:
CONFIG_PATH = os.getenv("CONFIG_PATH", default="/app/config_emhass.yaml")
CONFIG_PATH = os.getenv("CONFIG_PATH", default="/workspaces/emhass/app/config_emhass.yaml")
# CONFIG_PATH = os.getenv("CONFIG_PATH", default="/app/config_emhass.yaml")
options = None
DATA_PATH = os.getenv("DATA_PATH", default="/app/data/")
DATA_PATH = os.getenv("DATA_PATH", default="/workspaces/emhass/app/data/")
# DATA_PATH = os.getenv("DATA_PATH", default="/app/data/")
config_path = Path(CONFIG_PATH)
data_path = Path(DATA_PATH)

Expand Down Expand Up @@ -330,7 +338,8 @@ def action_call(action_name):
else:
costfun = os.getenv('LOCAL_COSTFUN', default='profit')
logging_level = os.getenv('LOGGING_LEVEL', default='INFO')
with open(os.getenv('SECRETS_PATH', default='/app/secrets_emhass.yaml'), 'r') as file:
# with open(os.getenv('SECRETS_PATH', default='/app/secrets_emhass.yaml'), 'r') as file:
with open(os.getenv('SECRETS_PATH', default='/workspaces/emhass/app/secrets_emhass.yaml'), 'r') as file:
params_secrets = yaml.load(file, Loader=yaml.FullLoader)
hass_url = params_secrets['hass_url']

Expand Down

0 comments on commit a7355ce

Please sign in to comment.