Skip to content

Commit

Permalink
Forecasting in EVA (#969)
Browse files Browse the repository at this point in the history
Implemented standalone forecasting in EVA (using
[statsforecast](https://nixtla.github.io/statsforecast) package). You
can run it via the following commands:

```sql
DROP TABLE IF EXISTS AirData;

CREATE TABLE AirData (
    unique_id TEXT(30),
    ds TEXT(30),
    y INTEGER);

LOAD CSV 'data/forecasting/air-passengers.csv' INTO AirData;

DROP UDF IF EXISTS Forecast;

CREATE UDF Forecast
FROM (SELECT unique_id, ds, y FROM AirData)
TYPE Forecasting
'predict' 'y';

SELECT Forecast(12) FROM AirData;
```
Here `Forecast(12)` signifies a horizon length of `12`.

Thanks!

---------

Co-authored-by: xzdandy <[email protected]>
  • Loading branch information
2 people authored and jiashenC committed Sep 5, 2023
1 parent 18f98de commit cadebbd
Show file tree
Hide file tree
Showing 13 changed files with 473 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ jobs:
source test_evadb/bin/activate
pip install --upgrade pip
pip debug --verbose
pip install ".[dev,ludwig,qdrant]"
pip install ".[dev,ludwig,qdrant,forecasting]"
source test_evadb/bin/activate
bash script/test/test.sh -m "<< parameters.mode >>"
Expand Down
145 changes: 145 additions & 0 deletions data/forecasting/air-passengers.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
unique_id,ds,y
AirPassengers,1949-01-01,112
AirPassengers,1949-02-01,118
AirPassengers,1949-03-01,132
AirPassengers,1949-04-01,129
AirPassengers,1949-05-01,121
AirPassengers,1949-06-01,135
AirPassengers,1949-07-01,148
AirPassengers,1949-08-01,148
AirPassengers,1949-09-01,136
AirPassengers,1949-10-01,119
AirPassengers,1949-11-01,104
AirPassengers,1949-12-01,118
AirPassengers,1950-01-01,115
AirPassengers,1950-02-01,126
AirPassengers,1950-03-01,141
AirPassengers,1950-04-01,135
AirPassengers,1950-05-01,125
AirPassengers,1950-06-01,149
AirPassengers,1950-07-01,170
AirPassengers,1950-08-01,170
AirPassengers,1950-09-01,158
AirPassengers,1950-10-01,133
AirPassengers,1950-11-01,114
AirPassengers,1950-12-01,140
AirPassengers,1951-01-01,145
AirPassengers,1951-02-01,150
AirPassengers,1951-03-01,178
AirPassengers,1951-04-01,163
AirPassengers,1951-05-01,172
AirPassengers,1951-06-01,178
AirPassengers,1951-07-01,199
AirPassengers,1951-08-01,199
AirPassengers,1951-09-01,184
AirPassengers,1951-10-01,162
AirPassengers,1951-11-01,146
AirPassengers,1951-12-01,166
AirPassengers,1952-01-01,171
AirPassengers,1952-02-01,180
AirPassengers,1952-03-01,193
AirPassengers,1952-04-01,181
AirPassengers,1952-05-01,183
AirPassengers,1952-06-01,218
AirPassengers,1952-07-01,230
AirPassengers,1952-08-01,242
AirPassengers,1952-09-01,209
AirPassengers,1952-10-01,191
AirPassengers,1952-11-01,172
AirPassengers,1952-12-01,194
AirPassengers,1953-01-01,196
AirPassengers,1953-02-01,196
AirPassengers,1953-03-01,236
AirPassengers,1953-04-01,235
AirPassengers,1953-05-01,229
AirPassengers,1953-06-01,243
AirPassengers,1953-07-01,264
AirPassengers,1953-08-01,272
AirPassengers,1953-09-01,237
AirPassengers,1953-10-01,211
AirPassengers,1953-11-01,180
AirPassengers,1953-12-01,201
AirPassengers,1954-01-01,204
AirPassengers,1954-02-01,188
AirPassengers,1954-03-01,235
AirPassengers,1954-04-01,227
AirPassengers,1954-05-01,234
AirPassengers,1954-06-01,264
AirPassengers,1954-07-01,302
AirPassengers,1954-08-01,293
AirPassengers,1954-09-01,259
AirPassengers,1954-10-01,229
AirPassengers,1954-11-01,203
AirPassengers,1954-12-01,229
AirPassengers,1955-01-01,242
AirPassengers,1955-02-01,233
AirPassengers,1955-03-01,267
AirPassengers,1955-04-01,269
AirPassengers,1955-05-01,270
AirPassengers,1955-06-01,315
AirPassengers,1955-07-01,364
AirPassengers,1955-08-01,347
AirPassengers,1955-09-01,312
AirPassengers,1955-10-01,274
AirPassengers,1955-11-01,237
AirPassengers,1955-12-01,278
AirPassengers,1956-01-01,284
AirPassengers,1956-02-01,277
AirPassengers,1956-03-01,317
AirPassengers,1956-04-01,313
AirPassengers,1956-05-01,318
AirPassengers,1956-06-01,374
AirPassengers,1956-07-01,413
AirPassengers,1956-08-01,405
AirPassengers,1956-09-01,355
AirPassengers,1956-10-01,306
AirPassengers,1956-11-01,271
AirPassengers,1956-12-01,306
AirPassengers,1957-01-01,315
AirPassengers,1957-02-01,301
AirPassengers,1957-03-01,356
AirPassengers,1957-04-01,348
AirPassengers,1957-05-01,355
AirPassengers,1957-06-01,422
AirPassengers,1957-07-01,465
AirPassengers,1957-08-01,467
AirPassengers,1957-09-01,404
AirPassengers,1957-10-01,347
AirPassengers,1957-11-01,305
AirPassengers,1957-12-01,336
AirPassengers,1958-01-01,340
AirPassengers,1958-02-01,318
AirPassengers,1958-03-01,362
AirPassengers,1958-04-01,348
AirPassengers,1958-05-01,363
AirPassengers,1958-06-01,435
AirPassengers,1958-07-01,491
AirPassengers,1958-08-01,505
AirPassengers,1958-09-01,404
AirPassengers,1958-10-01,359
AirPassengers,1958-11-01,310
AirPassengers,1958-12-01,337
AirPassengers,1959-01-01,360
AirPassengers,1959-02-01,342
AirPassengers,1959-03-01,406
AirPassengers,1959-04-01,396
AirPassengers,1959-05-01,420
AirPassengers,1959-06-01,472
AirPassengers,1959-07-01,548
AirPassengers,1959-08-01,559
AirPassengers,1959-09-01,463
AirPassengers,1959-10-01,407
AirPassengers,1959-11-01,362
AirPassengers,1959-12-01,405
AirPassengers,1960-01-01,417
AirPassengers,1960-02-01,391
AirPassengers,1960-03-01,419
AirPassengers,1960-04-01,461
AirPassengers,1960-05-01,472
AirPassengers,1960-06-01,535
AirPassengers,1960-07-01,622
AirPassengers,1960-08-01,606
AirPassengers,1960-09-01,508
AirPassengers,1960-10-01,461
AirPassengers,1960-11-01,390
AirPassengers,1960-12-01,432
2 changes: 2 additions & 0 deletions docs/_toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ parts:
sections:
- file: source/reference/ai/model-train
title: Model Training
- file: source/reference/udfs/model-forecasting
title: Time Series Forecasting
- file: source/reference/ai/hf
title: Hugging Face
- file: source/reference/ai/openai
Expand Down
1 change: 1 addition & 0 deletions docs/source/reference/ai/custom.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.. _udf:


Functions
======================

Expand Down
37 changes: 37 additions & 0 deletions docs/source/reference/udfs/model-forecasting.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Time Series Forecasting
========================

You can train a forecasting model easily in EvaDB.

.. note::

Install `statsforecast` in your EvaDB virtual environment: ``pip install statsforecast``.

First, we create a table to insert required data.

.. code-block:: sql
CREATE TABLE AirData (
unique_id TEXT(30),
ds TEXT(30),
y INTEGER);
LOAD CSV 'data/forecasting/air-passengers.csv' INTO AirData;
Next, we create a UDF of `TYPE Forecasting`. We must enter the column name on which we wish to forecast using `predict`. Other options include `id` and `time` (they represent the unique id of the items and the time data if available).

.. code-block:: sql
CREATE UDF IF NOT EXISTS Forecast FROM
(SELECT y FROM AirData)
TYPE Forecasting
'predict' 'y';
This trains a forecasting model. The model can be called by providing the horizon for forecasting.

.. code-block:: sql
SELECT Forecast(12) FROM AirData;
Here, the horizon is `12`.
1 change: 1 addition & 0 deletions docs/source/usecases/question-answering.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ EvaDB has built-in support for ``ChatGPT`` function from ``OpenAI``. You will ne

EvaDB has built-in support for a wide range of :ref:`OpenAI<openai>` models. You can also switch to another large language models that runs locally by defining a :ref:`Custom function<udf>`.


ChatGPT function is a wrapper around OpenAI API call. You can also switch to other LLM models that can run locally.

Convert Speech to Text
Expand Down
5 changes: 4 additions & 1 deletion evadb/binder/statement_binder.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ def _bind_create_function_statement(self, node: CreateFunctionStatement):
inputs, outputs = [], []
for column in all_column_list:
if column.name in predict_columns:
column.name = column.name + "_predictions"
if node.udf_type != "Forecasting":
column.name = column.name + "_predictions"
else:
column.name = column.name
outputs.append(column)
else:
inputs.append(column)
Expand Down
118 changes: 118 additions & 0 deletions evadb/executor/create_udf_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import os
import pickle
from pathlib import Path
from typing import Dict, List

Expand All @@ -35,6 +37,7 @@
from evadb.utils.errors import UDFIODefinitionError
from evadb.utils.generic_utils import (
load_udf_class_from_file,
try_to_import_forecast,
try_to_import_ludwig,
try_to_import_torch,
try_to_import_ultralytics,
Expand Down Expand Up @@ -128,6 +131,119 @@ def handle_ultralytics_udf(self):
self.node.metadata,
)

def handle_forecasting_udf(self):
"""Handle forecasting UDFs"""
aggregated_batch_list = []
child = self.children[0]
for batch in child.exec():
aggregated_batch_list.append(batch)
aggregated_batch = Batch.concat(aggregated_batch_list, copy=False)
aggregated_batch.drop_column_alias()

arg_map = {arg.key: arg.value for arg in self.node.metadata}
if not self.node.impl_path:
impl_path = Path(f"{self.udf_dir}/forecast.py").absolute().as_posix()
else:
impl_path = self.node.impl_path.absolute().as_posix()
arg_map = {arg.key: arg.value for arg in self.node.metadata}

if "model" not in arg_map.keys():
arg_map["model"] = "AutoARIMA"
if "frequency" not in arg_map.keys():
arg_map["frequency"] = "M"

model_name = arg_map["model"]
frequency = arg_map["frequency"]

data = aggregated_batch.frames.rename(columns={arg_map["predict"]: "y"})
if "time" in arg_map.keys():
aggregated_batch.frames.rename(columns={arg_map["time"]: "ds"})
if "id" in arg_map.keys():
aggregated_batch.frames.rename(columns={arg_map["id"]: "unique_id"})

if "unique_id" not in list(data.columns):
data["unique_id"] = ["test" for x in range(len(data))]

if "ds" not in list(data.columns):
data["ds"] = [x + 1 for x in range(len(data))]

try_to_import_forecast()
from statsforecast import StatsForecast
from statsforecast.models import AutoARIMA, AutoCES, AutoETS, AutoTheta

model_dict = {
"AutoARIMA": AutoARIMA,
"AutoCES": AutoCES,
"AutoETS": AutoETS,
"AutoTheta": AutoTheta,
}

season_dict = { # https://pandas.pydata.org/docs/user_guide/timeseries.html#timeseries-offset-aliases
"H": 24,
"M": 12,
"Q": 4,
"SM": 24,
"BM": 12,
"BMS": 12,
"BQ": 4,
"BH": 24,
}

new_freq = (
frequency.split("-")[0] if "-" in frequency else frequency
) # shortens longer frequencies like Q-DEC
season_length = season_dict[new_freq] if new_freq in season_dict else 1
model = StatsForecast(
[model_dict[model_name](season_length=season_length)], freq=new_freq
)

model_dir = os.path.join(
self.db.config.get_value("storage", "model_dir"), self.node.name
)
Path(model_dir).mkdir(parents=True, exist_ok=True)
model_path = os.path.join(
self.db.config.get_value("storage", "model_dir"),
self.node.name,
str(hashlib.sha256(data.to_string().encode()).hexdigest()) + ".pkl",
)

weight_file = Path(model_path)

if not weight_file.exists():
model.fit(data)
f = open(model_path, "wb")
pickle.dump(model, f)
f.close()

arg_map_here = {"model_name": model_name, "model_path": model_path}
udf = self._try_initializing_udf(impl_path, arg_map_here)
io_list = self._resolve_udf_io(udf)

metadata_here = [
UdfMetadataCatalogEntry(
key="model_name",
value=model_name,
udf_id=None,
udf_name=None,
row_id=None,
),
UdfMetadataCatalogEntry(
key="model_path",
value=model_path,
udf_id=None,
udf_name=None,
row_id=None,
),
]

return (
self.node.name,
impl_path,
self.node.udf_type,
io_list,
metadata_here,
)

def handle_generic_udf(self):
"""Handle generic UDFs
Expand Down Expand Up @@ -168,6 +284,8 @@ def exec(self, *args, **kwargs):
name, impl_path, udf_type, io_list, metadata = self.handle_ultralytics_udf()
elif self.node.udf_type == "Ludwig":
name, impl_path, udf_type, io_list, metadata = self.handle_ludwig_udf()
elif self.node.udf_type == "Forecasting":
name, impl_path, udf_type, io_list, metadata = self.handle_forecasting_udf()
else:
name, impl_path, udf_type, io_list, metadata = self.handle_generic_udf()

Expand Down
Loading

0 comments on commit cadebbd

Please sign in to comment.