Skip to content

Commit

Permalink
examples: pipeline using block (#216)
Browse files Browse the repository at this point in the history
Signed-off-by: Avik Basu <[email protected]>
  • Loading branch information
ab93 committed Jun 20, 2023
1 parent c41d0d6 commit 6b1e6a6
Show file tree
Hide file tree
Showing 27 changed files with 4,360 additions and 0 deletions.
File renamed without changes.
45 changes: 45 additions & 0 deletions examples/block_pipeline/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
####################################################################################################
# builder: install needed dependencies
####################################################################################################

FROM python:3.10-slim-bullseye AS builder

ENV PYTHONFAULTHANDLER=1 \
PYTHONUNBUFFERED=1 \
PYTHONHASHSEED=random \
PIP_NO_CACHE_DIR=on \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=100 \
PYSETUP_PATH="/opt/pysetup" \
VENV_PATH="/opt/pysetup/.venv"

ENV PATH="$VENV_PATH/bin:$PATH"

RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
wget \
# deps for building python deps
build-essential \
&& apt-get install -y git \
&& apt-get clean && rm -rf /var/lib/apt/lists/* \
\
# install dumb-init
&& wget -O /dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.5/dumb-init_1.2.5_x86_64 \
&& chmod +x /dumb-init

####################################################################################################
# udf: used for running the udf vertices
####################################################################################################
FROM builder AS udf

WORKDIR $PYSETUP_PATH
COPY ./requirements.txt ./
RUN pip3 install -r requirements.txt

ADD . /app
WORKDIR /app

ENTRYPOINT ["/dumb-init", "--"]

EXPOSE 5000
96 changes: 96 additions & 0 deletions examples/block_pipeline/numa-pl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: InterStepBufferService
metadata:
name: redis-isbs # change it
spec:
redis:
native:
version: 7.0.11
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/component: isbsvc
numaflow.numaproj.io/isbsvc-name: redis-isbs # Change it
topologyKey: topology.kubernetes.io/zone
weight: 100
persistence:
accessMode: ReadWriteOnce
volumeSize: 1Gi
settings:
redis: |
maxmemory 4096mb
---
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: blocks
spec:
watermark:
disabled: false
limits:
readBatchSize: 10
bufferMaxLength: 500
bufferUsageLimit: 100
vertices:
- name: in
source:
http: {}
- name: inference
scale:
min: 1
udf:
container:
image: blockpl:v0.0.8
env:
- name: REDIS_AUTH
valueFrom:
secretKeyRef:
name: isbsvc-redis-isbs-redis-auth
key: redis-password
args:
- python
- server.py
- inference
- name: train
scale:
min: 1
udf:
container:
image: blockpl:v0.0.8
env:
- name: REDIS_AUTH
valueFrom:
secretKeyRef:
name: isbsvc-redis-isbs-redis-auth
key: redis-password
args:
- python
- server.py
- train
- name: out
scale:
min: 1
sink:
log: {}
edges:
- from: in
to: inference
- conditions:
tags:
operator: or
values:
- train
from: inference
to: train
- from: inference
to: out
conditions:
tags:
operator: or
values:
- out
4 changes: 4 additions & 0 deletions examples/block_pipeline/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cachetools>5.2,<6.0
numalogic[redis,numaflow] @ git+https://github.com/numaproj/numalogic.git@main
# ../../../numalogic[redis,numaflow] # for local testing
pytorch-lightning>2.0,< 3.0
20 changes: 20 additions & 0 deletions examples/block_pipeline/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import sys

from pynumaflow.function import Server
from src import Inference, Train


if __name__ == "__main__":
if len(sys.argv) != 2:
raise ValueError("Please provide a step name")

step = sys.argv[1]
if step == "inference":
step_handler = Inference()
elif step == "train":
step_handler = Train()
else:
raise ValueError(f"Invalid step provided: {step}")

grpc_server = Server(map_handler=step_handler)
grpc_server.start()
19 changes: 19 additions & 0 deletions examples/block_pipeline/src/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import logging

from src.inference import Inference
from src.train import Train

logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)

stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)

formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
stream_handler.setFormatter(formatter)

LOGGER.addHandler(stream_handler)


__all__ = ["Inference", "Train"]
64 changes: 64 additions & 0 deletions examples/block_pipeline/src/inference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import json
import logging

import numpy as np
from numalogic.blocks import (
BlockPipeline,
PreprocessBlock,
NNBlock,
ThresholdBlock,
PostprocessBlock,
)
from numalogic.models.autoencoder.variants import SparseVanillaAE
from numalogic.models.threshold import StdDevThreshold
from numalogic.numaflow import NumalogicUDF
from numalogic.registry import RedisRegistry
from numalogic.tools.exceptions import RedisRegistryError
from numalogic.transforms import TanhNorm
from pynumaflow.function import Messages, Datum, Message
from sklearn.preprocessing import StandardScaler

from src.utils import RedisClient

_LOGGER = logging.getLogger(__name__)


class Inference(NumalogicUDF):
"""UDF to preprocess the input data for ML inference."""

def __init__(self, seq_len: int = 12, num_series: int = 1):
super().__init__()
self.seq_len = seq_len
self.n_features = num_series
self.registry = RedisRegistry(client=RedisClient().get_client())

def exec(self, keys: list[str], datum: Datum) -> Messages:
# Load json data
series = json.loads(datum.value)["data"]

block_pl = BlockPipeline(
PreprocessBlock(StandardScaler()),
NNBlock(
SparseVanillaAE(seq_len=self.seq_len, n_features=self.n_features), self.seq_len
),
ThresholdBlock(StdDevThreshold()),
PostprocessBlock(TanhNorm()),
registry=self.registry,
)

# Load the model from the registry
try:
block_pl.load(skeys=["blockpl"], dkeys=["sparsevanillae"])
except RedisRegistryError as warn:
_LOGGER.warning("Error loading block pipeline: %r", warn)
return Messages(Message(value=b"", tags=["train"]))

# Run inference
try:
output = block_pl(np.asarray(series).reshape(-1, self.n_features))
except Exception as err:
_LOGGER.error("Error running block pipeline: %r", err)
return Messages(Message.to_drop())

anomaly_score = np.mean(output)
return Messages(Message(tags=["out"], value=json.dumps({"score": anomaly_score}).encode()))
64 changes: 64 additions & 0 deletions examples/block_pipeline/src/train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import logging

import pandas as pd
from cachetools import TTLCache
from numalogic.blocks import (
BlockPipeline,
PreprocessBlock,
NNBlock,
ThresholdBlock,
PostprocessBlock,
)
from numalogic.models.autoencoder.variants import SparseVanillaAE
from numalogic.models.threshold import StdDevThreshold
from numalogic.numaflow import NumalogicUDF
from numalogic.registry import RedisRegistry
from numalogic.transforms import TanhNorm
from pynumaflow.function import Datum, Messages, Message
from sklearn.preprocessing import StandardScaler

from src.utils import RedisClient, TRAIN_DATA_PATH

_LOGGER = logging.getLogger(__name__)


class Train(NumalogicUDF):
"""UDF to train the model and save it to the registry."""

ttl_cache = TTLCache(maxsize=16, ttl=60)

def __init__(self, seq_len: int = 12, num_series: int = 1):
super().__init__()
self.seq_len = seq_len
self.n_features = num_series
self.registry = RedisRegistry(client=RedisClient().get_client())
self._model_key = "sparsevanillae"

def exec(self, keys: list[str], datum: Datum) -> Messages:
"""The train function here trains the model and saves it to the registry."""
# Check if a training message has been received very recently
if self._model_key in self.ttl_cache:
return Messages(Message.to_drop())
self.ttl_cache[self._model_key] = self._model_key

# Load Training data
data = pd.read_csv(TRAIN_DATA_PATH, index_col=None)

# Define the block pipeline
block_pl = BlockPipeline(
PreprocessBlock(StandardScaler()),
NNBlock(
SparseVanillaAE(seq_len=self.seq_len, n_features=self.n_features), self.seq_len
),
ThresholdBlock(StdDevThreshold()),
PostprocessBlock(TanhNorm()),
registry=self.registry,
)
block_pl.fit(data)

# Save the model to the registry
block_pl.save(skeys=["blockpl"], dkeys=["sparsevanillae"])
_LOGGER.info("Model saved to registry")

# Train vertex is the last vertex in the pipeline
return Messages(Message.to_drop())
42 changes: 42 additions & 0 deletions examples/block_pipeline/src/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
import os

from redis.sentinel import Sentinel

from numalogic.tools.types import Singleton, redis_client_t

_LOGGER = logging.getLogger(__name__)
_DIR = os.path.dirname(__file__)
_ROOT_DIR = os.path.split(_DIR)[0]
TRAIN_DATA_PATH = os.path.join(_ROOT_DIR, "resources/train_data.csv")

AUTH = os.getenv("REDIS_AUTH")
HOST = os.getenv("REDIS_HOST", default="isbsvc-redis-isbs-redis-svc.default.svc")
PORT = os.getenv("REDIS_PORT", default="26379")
MASTERNAME = os.getenv("REDIS_MASTER_NAME", default="mymaster")


class RedisClient(metaclass=Singleton):
"""Singleton class to manage redis client."""

_client: redis_client_t = None

def __init__(self):
if not self._client:
self.set_client()

def set_client(self) -> None:
sentinel_args = {
"sentinels": [(HOST, PORT)],
"socket_timeout": 0.1,
}
_LOGGER.info("Connecting to redis sentinel: %s, %s, %s", sentinel_args, MASTERNAME, AUTH)
sentinel = Sentinel(
**sentinel_args,
sentinel_kwargs=dict(password=AUTH),
password=AUTH,
)
self._client = sentinel.master_for(MASTERNAME)

def get_client(self) -> redis_client_t:
return self._client
6 changes: 6 additions & 0 deletions examples/multi_udf/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.git/
__pycache__/
**/__pycache__/
*.py[cod]
*$py.class
.idea/
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 6b1e6a6

Please sign in to comment.