Skip to content

Commit

Permalink
feat!: introduce numalogic blocks (#206)
Browse files Browse the repository at this point in the history
- block as a way of abstraction of ML related tasks
- block pipeline to chain multiple blocks together
- support saving/loading of artifacts (only redis registry can support
this)
- improve typing

---------

Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 authored Jun 7, 2023
1 parent 4058811 commit 9ebc32f
Show file tree
Hide file tree
Showing 13 changed files with 868 additions and 200 deletions.
6 changes: 3 additions & 3 deletions numalogic/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

import numpy.typing as npt
import pytorch_lightning as pl
from sklearn.base import TransformerMixin, BaseEstimator, OutlierMixin
from sklearn.base import TransformerMixin, OutlierMixin


class BaseTransformer(TransformerMixin, BaseEstimator):
class BaseTransformer(TransformerMixin):
"""Base class for all transformer classes."""

pass
Expand Down Expand Up @@ -47,7 +47,7 @@ class TorchModel(pl.LightningModule, metaclass=ABCMeta):
pass


class BaseThresholdModel(OutlierMixin, BaseEstimator):
class BaseThresholdModel(OutlierMixin):
"""Base class for all threshold models."""

pass
29 changes: 29 additions & 0 deletions numalogic/blocks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2022 The Numaproj Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Module for numalogic blocks which are units of computation that can be
chained together to form a pipeline if needed. A block can be stateful or stateless.
"""

from numalogic.blocks._base import Block
from numalogic.blocks._nn import NNBlock
from numalogic.blocks._transform import PreprocessBlock, PostprocessBlock, ThresholdBlock
from numalogic.blocks.pipeline import BlockPipeline

__all__ = [
"Block",
"NNBlock",
"PreprocessBlock",
"PostprocessBlock",
"ThresholdBlock",
"BlockPipeline",
]
138 changes: 138 additions & 0 deletions numalogic/blocks/_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# Copyright 2022 The Numaproj Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABCMeta, abstractmethod
from typing import Generic, Union

import numpy.typing as npt

from numalogic.tools.types import artifact_t, state_dict_t


class Block(Generic[artifact_t], metaclass=ABCMeta):
"""
Base class for all blocks.
A block is a unit of computation that can be
chained together to form a pipeline. A block can be stateful or stateless.
A stateful block is one that has a state that can be updated by calling the
block with new data. A stateless block is one that does not have a state and
can be called with new data without any side effects.
A block can be used as a callable. The call method is an alias for the run method.
Args:
----
artifact: The artifact that the block operates on.
name: The name of the block
stateful: Whether the block is stateful or not. (default: True)
"""

__slots__ = ("_name", "_stateful", "_artifact")

def __init__(self, artifact: artifact_t, name: str, stateful: bool = True):
self._artifact = artifact
self._name = name
self._stateful = stateful

@property
def name(self) -> str:
"""The name of the block."""
return self._name

@property
def stateful(self) -> bool:
"""Whether the block is stateful or not."""
return self._stateful

@property
def artifact(self) -> artifact_t:
"""The artifact that the block operates on."""
return self._artifact

@property
def artifact_state(self) -> Union[artifact_t, state_dict_t]:
"""
The state of the artifact that needs to be serialized for saving.
This needs to be overridden if something other than the artifact itself
needs to be serialized, e.g. statedict, or a torchscript module.
"""
return self._artifact

@artifact_state.setter
def artifact_state(self, state: Union[artifact_t, state_dict_t]) -> None:
"""
The state of the artifact that needs to be deserialized for loading.
This needs to be overridden if something other than the artifact itself
needs to be deserialized, e.g. statedict, or a torchscript module.
"""
self._artifact = state

def __call__(self, *args, **kwargs) -> npt.NDArray[float]:
"""Alias for the run method."""
return self.run(*args, **kwargs)

@abstractmethod
def fit(self, data: npt.NDArray[float], *args, **kwargs):
"""
Train the block on the input data.
Implement this method to train the block, using the block's artifact.
Args:
----
data: The input data to train the block on.
*args: Additional arguments for the block.
**kwargs: Additional keyword arguments for fitting the block.
"""
pass

@abstractmethod
def run(self, stream: npt.NDArray[float], *args, **kwargs) -> npt.NDArray[float]:
"""
Run inference on the block on the streaming input data.
Implement this method to run inference on the block,
using the block's artifact.
Args:
----
stream: The streaming input data.
*args: Additional arguments for the block.
**kwargs: Additional keyword arguments for the block.
"""
pass


class StatelessBlock(Block, metaclass=ABCMeta):
"""
Base class for all stateless blocks.
A stateless block is one that does not have a state and
can be called with new data without any side effects.
"""

def __init__(self, artifact: artifact_t, name: str):
super().__init__(artifact, name, stateful=False)

def fit(self, data: npt.NDArray[float], *args, **kwargs) -> npt.NDArray[float]:
"""
A no-op for stateless blocks.
Args:
----
data: The input data to train the block on.
*args: Additional arguments for the block.
**kwargs: Additional keyword arguments for fitting the block.
"""
return self.run(data, *args, **kwargs)
93 changes: 93 additions & 0 deletions numalogic/blocks/_nn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Copyright 2022 The Numaproj Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import torch
from torch.utils.data import DataLoader
import numpy.typing as npt

from numalogic.blocks import Block
from numalogic.models.autoencoder import AutoencoderTrainer
from numalogic.tools.data import StreamingDataset
from numalogic.tools.types import nn_model_t, state_dict_t


class NNBlock(Block):
"""
A block that uses a neural network model to operate on the artifact.
Serialization is done by saving state dict of the model.
Args:
----
model: The neural network model.
seq_len: The sequence length of the input data.
name: The name of the block. Defaults to "nn".
"""

__slots__ = ("seq_len",)

def __init__(self, model: nn_model_t, seq_len: int, name: str = "nn"):
super().__init__(artifact=model, name=name)
self.seq_len = seq_len

@property
def artifact_state(self) -> state_dict_t:
"""The state dict of the model."""
return self._artifact.state_dict()

@artifact_state.setter
def artifact_state(self, artifact_state: state_dict_t) -> None:
"""Set the state dict of the model."""
self._artifact.load_state_dict(artifact_state)

def fit(
self, input_: npt.NDArray[float], batch_size: int = 64, **trainer_kwargs
) -> npt.NDArray[float]:
"""
Train the model on the input data.
Args:
----
input_: The input data.
batch_size: The batch size to use for training.
trainer_kwargs: Keyword arguments to pass to the lightning trainer.
Returns
-------
The error of the model on the input data.
"""
trainer = AutoencoderTrainer(**trainer_kwargs)
ds = StreamingDataset(input_, self.seq_len)
trainer.fit(self._artifact, train_dataloaders=DataLoader(ds, batch_size=batch_size))
reconerr = trainer.predict(
self._artifact, dataloaders=DataLoader(ds, batch_size=batch_size)
)
return reconerr.numpy()

def run(self, input_: npt.NDArray[float], **_) -> npt.NDArray[float]:
"""
Perform forward pass on the streaming input data.
Args:
----
input_: The streaming input data.
Returns
-------
The error of the model on the input data.
"""
input_ = torch.from_numpy(input_).float()
# Add a batch dimension
input_ = torch.unsqueeze(input_, dim=0).contiguous()
self._artifact.eval()
with torch.no_grad():
reconerr = self._artifact.predict_step(input_, batch_idx=0)
return torch.squeeze(reconerr, dim=0).numpy()
Loading

0 comments on commit 9ebc32f

Please sign in to comment.