Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release: v0.4.1 #218

Merged
merged 5 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10"]
python-version: ["3.9", "3.10", "3.11"]

steps:
- uses: actions/checkout@v3
Expand Down
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# Changelog

## v0.4.0.post1 (2023-06-06)

* [b134fa2](https://github.com/numaproj/numalogic/commit/b134fa27a05667c4a51fc5585d0b26618f0c0139) fix: redis logging (#209)

### Contributors

* Kushal Batra

## v0.4.0 (2023-06-06)

* [bd050c9](https://github.com/numaproj/numalogic/commit/bd050c9b422cc8cc17d004d03c43b21096a21d47) fix: removing logger level setting (#205)

### Contributors

* Kushal Batra

## v0.4a1 (2023-06-02)

* [1f5f458](https://github.com/numaproj/numalogic/commit/1f5f458dab3baf34cb43de7b95e10dc00cc88d71) fix: add caching logs (#203)
Expand Down
45 changes: 45 additions & 0 deletions examples/block_pipeline/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
####################################################################################################
# builder: install needed dependencies
####################################################################################################

FROM python:3.10-slim-bullseye AS builder

ENV PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PYTHONHASHSEED=random \
PIP_NO_CACHE_DIR=on \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
PYSETUP_PATH="/opt/pysetup" \
VENV_PATH="/opt/pysetup/.venv"

ENV PATH="$VENV_PATH/bin:$PATH"

RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
wget \
# deps for building python deps
build-essential \
&& apt-get install -y git \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
\
# install dumb-init
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
&& chmod +x /dumb-init

####################################################################################################
# udf: used for running the udf vertices
####################################################################################################
FROM builder AS udf

WORKDIR $PYSETUP_PATH
COPY ./requirements.txt ./
RUN pip3 install -r requirements.txt

ADD . /app
WORKDIR /app

ENTRYPOINT ["/dumb-init", "--"]

EXPOSE 5000
96 changes: 96 additions & 0 deletions examples/block_pipeline/numa-pl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: InterStepBufferService
metadata:
name: redis-isbs # change it
spec:
redis:
native:
version: 7.0.11
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/component: isbsvc
numaflow.numaproj.io/isbsvc-name: redis-isbs # Change it
topologyKey: topology.kubernetes.io/zone
weight: 100
persistence:
accessMode: ReadWriteOnce
volumeSize: 1Gi
settings:
redis: |
maxmemory 4096mb


---
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: blocks
spec:
watermark:
disabled: false
limits:
readBatchSize: 10
bufferMaxLength: 500
bufferUsageLimit: 100
vertices:
- name: in
source:
http: {}
- name: inference
scale:
min: 1
udf:
container:
image: blockpl:v0.0.8
env:
- name: REDIS_AUTH
valueFrom:
secretKeyRef:
name: isbsvc-redis-isbs-redis-auth
key: redis-password
args:
- python
- server.py
- inference
- name: train
scale:
min: 1
udf:
container:
image: blockpl:v0.0.8
env:
- name: REDIS_AUTH
valueFrom:
secretKeyRef:
name: isbsvc-redis-isbs-redis-auth
key: redis-password
args:
- python
- server.py
- train
- name: out
scale:
min: 1
sink:
log: {}
edges:
- from: in
to: inference
- conditions:
tags:
operator: or
values:
- train
from: inference
to: train
- from: inference
to: out
conditions:
tags:
operator: or
values:
- out
4 changes: 4 additions & 0 deletions examples/block_pipeline/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cachetools>5.2,<6.0
numalogic[redis,numaflow] @ git+https://github.com/numaproj/numalogic.git@main
# ../../../numalogic[redis,numaflow] # for local testing
pytorch-lightning>2.0,< 3.0
20 changes: 20 additions & 0 deletions examples/block_pipeline/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import sys

from pynumaflow.function import Server
from src import Inference, Train


if __name__ == "__main__":
if len(sys.argv) != 2:
raise ValueError("Please provide a step name")

step = sys.argv[1]
if step == "inference":
step_handler = Inference()
elif step == "train":
step_handler = Train()
else:
raise ValueError(f"Invalid step provided: {step}")

grpc_server = Server(map_handler=step_handler)
grpc_server.start()
19 changes: 19 additions & 0 deletions examples/block_pipeline/src/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import logging

from src.inference import Inference
from src.train import Train

logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)

stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)

formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
stream_handler.setFormatter(formatter)

LOGGER.addHandler(stream_handler)


__all__ = ["Inference", "Train"]
64 changes: 64 additions & 0 deletions examples/block_pipeline/src/inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import json
import logging

import numpy as np
from numalogic.blocks import (
BlockPipeline,
PreprocessBlock,
NNBlock,
ThresholdBlock,
PostprocessBlock,
)
from numalogic.models.autoencoder.variants import SparseVanillaAE
from numalogic.models.threshold import StdDevThreshold
from numalogic.numaflow import NumalogicUDF
from numalogic.registry import RedisRegistry
from numalogic.tools.exceptions import RedisRegistryError
from numalogic.transforms import TanhNorm
from pynumaflow.function import Messages, Datum, Message
from sklearn.preprocessing import StandardScaler

from src.utils import RedisClient

_LOGGER = logging.getLogger(__name__)


class Inference(NumalogicUDF):
"""UDF to preprocess the input data for ML inference."""

def __init__(self, seq_len: int = 12, num_series: int = 1):
super().__init__()
self.seq_len = seq_len
self.n_features = num_series
self.registry = RedisRegistry(client=RedisClient().get_client())

def exec(self, keys: list[str], datum: Datum) -> Messages:
# Load json data
series = json.loads(datum.value)["data"]

block_pl = BlockPipeline(
PreprocessBlock(StandardScaler()),
NNBlock(
SparseVanillaAE(seq_len=self.seq_len, n_features=self.n_features), self.seq_len
),
ThresholdBlock(StdDevThreshold()),
PostprocessBlock(TanhNorm()),
registry=self.registry,
)

# Load the model from the registry
try:
block_pl.load(skeys=["blockpl"], dkeys=["sparsevanillae"])
except RedisRegistryError as warn:
_LOGGER.warning("Error loading block pipeline: %r", warn)
return Messages(Message(value=b"", tags=["train"]))

# Run inference
try:
output = block_pl(np.asarray(series).reshape(-1, self.n_features))
except Exception as err:
_LOGGER.error("Error running block pipeline: %r", err)
return Messages(Message.to_drop())

anomaly_score = np.mean(output)
return Messages(Message(tags=["out"], value=json.dumps({"score": anomaly_score}).encode()))
64 changes: 64 additions & 0 deletions examples/block_pipeline/src/train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import logging

import pandas as pd
from cachetools import TTLCache
from numalogic.blocks import (
BlockPipeline,
PreprocessBlock,
NNBlock,
ThresholdBlock,
PostprocessBlock,
)
from numalogic.models.autoencoder.variants import SparseVanillaAE
from numalogic.models.threshold import StdDevThreshold
from numalogic.numaflow import NumalogicUDF
from numalogic.registry import RedisRegistry
from numalogic.transforms import TanhNorm
from pynumaflow.function import Datum, Messages, Message
from sklearn.preprocessing import StandardScaler

from src.utils import RedisClient, TRAIN_DATA_PATH

_LOGGER = logging.getLogger(__name__)


class Train(NumalogicUDF):
"""UDF to train the model and save it to the registry."""

ttl_cache = TTLCache(maxsize=16, ttl=60)

def __init__(self, seq_len: int = 12, num_series: int = 1):
super().__init__()
self.seq_len = seq_len
self.n_features = num_series
self.registry = RedisRegistry(client=RedisClient().get_client())
self._model_key = "sparsevanillae"

def exec(self, keys: list[str], datum: Datum) -> Messages:
"""The train function here trains the model and saves it to the registry."""
# Check if a training message has been received very recently
if self._model_key in self.ttl_cache:
return Messages(Message.to_drop())
self.ttl_cache[self._model_key] = self._model_key

# Load Training data
data = pd.read_csv(TRAIN_DATA_PATH, index_col=None)

# Define the block pipeline
block_pl = BlockPipeline(
PreprocessBlock(StandardScaler()),
NNBlock(
SparseVanillaAE(seq_len=self.seq_len, n_features=self.n_features), self.seq_len
),
ThresholdBlock(StdDevThreshold()),
PostprocessBlock(TanhNorm()),
registry=self.registry,
)
block_pl.fit(data)

# Save the model to the registry
block_pl.save(skeys=["blockpl"], dkeys=["sparsevanillae"])
_LOGGER.info("Model saved to registry")

# Train vertex is the last vertex in the pipeline
return Messages(Message.to_drop())
42 changes: 42 additions & 0 deletions examples/block_pipeline/src/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
import os

from redis.sentinel import Sentinel

from numalogic.tools.types import Singleton, redis_client_t

_LOGGER = logging.getLogger(__name__)
_DIR = os.path.dirname(__file__)
_ROOT_DIR = os.path.split(_DIR)[0]
TRAIN_DATA_PATH = os.path.join(_ROOT_DIR, "resources/train_data.csv")

AUTH = os.getenv("REDIS_AUTH")
HOST = os.getenv("REDIS_HOST", default="isbsvc-redis-isbs-redis-svc.default.svc")
PORT = os.getenv("REDIS_PORT", default="26379")
MASTERNAME = os.getenv("REDIS_MASTER_NAME", default="mymaster")


class RedisClient(metaclass=Singleton):
"""Singleton class to manage redis client."""

_client: redis_client_t = None

def __init__(self):
if not self._client:
self.set_client()

def set_client(self) -> None:
sentinel_args = {
"sentinels": [(HOST, PORT)],
"socket_timeout": 0.1,
}
_LOGGER.info("Connecting to redis sentinel: %s, %s, %s", sentinel_args, MASTERNAME, AUTH)
sentinel = Sentinel(
**sentinel_args,
sentinel_kwargs=dict(password=AUTH),
password=AUTH,
)
self._client = sentinel.master_for(MASTERNAME)

def get_client(self) -> redis_client_t:
return self._client
6 changes: 6 additions & 0 deletions examples/multi_udf/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.git/
__pycache__/
**/__pycache__/
*.py[cod]
*$py.class
.idea/
Loading