Skip to content

Commit

Permalink
fix test cases
Browse files Browse the repository at this point in the history
Signed-off-by: Kushal Batra <[email protected]>
  • Loading branch information
s0nicboOm committed Jun 12, 2024
1 parent 228888c commit 47f0ebd
Show file tree
Hide file tree
Showing 26 changed files with 151 additions and 108 deletions.
2 changes: 1 addition & 1 deletion numalogic/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
BASE_CONF_DIR = os.path.join(BASE_DIR, "config")

DEFAULT_BASE_CONF_PATH = os.path.join(BASE_CONF_DIR, "default-configs", "config.yaml")
DFAULT_METRICS_CONF_PATH = os.path.join(
DEFAULT_METRICS_CONF_PATH = os.path.join(
BASE_CONF_DIR, "default-configs", "numalogic_udf_metrics.yaml"
)
DEFAULT_APP_CONF_PATH = os.path.join(BASE_CONF_DIR, "app-configs", "config.yaml")
Expand Down
6 changes: 6 additions & 0 deletions numalogic/transforms/_stateless.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ def _validate_args(
if len(lower) != len(upper):
raise ValueError("lower and upper should have the same length.")
lower, upper = np.asarray(lower, dtype=np.float32), np.asarray(upper, dtype=np.float32)
elif lower is not None and upper is not None:
if type(lower) is not type(upper):
if isinstance(lower, Sequence):
upper = np.asarray(upper, dtype=np.float32)
else:
lower = np.asarray(lower, dtype=np.float32)
if upper is not None and lower is not None and np.any(lower > upper):
raise ValueError("lower value should be less than or equal to upper value")
return lower, upper
Expand Down
8 changes: 8 additions & 0 deletions numalogic/udfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from numalogic._constants import BASE_DIR
from numalogic.udfs._base import NumalogicUDF
from numalogic.udfs._config import StreamConf, PipelineConf, MLPipelineConf, load_pipeline_conf
from numalogic.udfs._metrics_utility import MetricsSingleton
from numalogic.udfs.factory import UDFFactory, ServerFactory
from numalogic.udfs.payloadtx import PayloadTransformer
from numalogic.udfs.inference import InferenceUDF
Expand All @@ -23,6 +24,11 @@ def set_logger() -> None:
logging.getLogger("root").setLevel(logging.DEBUG)


def set_metrics(conf_file: str) -> None:
"""Sets the metrics for the UDFs."""
MetricsSingleton().load_metrics(config_file_path=conf_file)


__all__ = [
"NumalogicUDF",
"PayloadTransformer",
Expand All @@ -40,4 +46,6 @@ def set_logger() -> None:
"load_pipeline_conf",
"ServerFactory",
"set_logger",
"set_metrics",
"MetricsSingleton",
]
7 changes: 4 additions & 3 deletions numalogic/udfs/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@
DEFAULT_BASE_CONF_PATH,
DEFAULT_APP_CONF_PATH,
DEFAULT_METRICS_PORT,
DEFAULT_METRICS_CONF_PATH,
)
from numalogic.connectors.redis import get_redis_client_from_conf
from numalogic.udfs import load_pipeline_conf, UDFFactory, ServerFactory, set_logger
from numalogic.udfs import load_pipeline_conf, UDFFactory, ServerFactory, set_logger, set_metrics

LOGGER = logging.getLogger(__name__)

BASE_CONF_FILE_PATH: Final[str] = os.getenv("BASE_CONF_PATH", default=DEFAULT_BASE_CONF_PATH)
APP_CONF_FILE_PATH: Final[str] = os.getenv("APP_CONF_PATH", default=DEFAULT_APP_CONF_PATH)
METRICS_PORT: Final[int] = int(os.getenv("METRICS_PORT", default=DEFAULT_METRICS_PORT))
METRICS_ENABLED: Final[bool] = bool(os.getenv("METRICS_ENABLED", default="True"))
METRICS_CONF_PATH: Final[str] = os.getenv("METRICS_CONF_PATH", default=DEFAULT_METRICS_CONF_PATH)


def init_server(step: str, server_type: str):
Expand Down Expand Up @@ -51,9 +53,8 @@ def start_server() -> None:

server = init_server(step, server_type)
server.start()

if METRICS_ENABLED:
# Start the metrics server at port METRICS_PORT = 8490
set_metrics(conf_file=METRICS_CONF_PATH)
start_metrics_server(METRICS_PORT)


Expand Down
73 changes: 38 additions & 35 deletions numalogic/udfs/_metrics_utility.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import os
from typing import Final, Optional
from typing import Optional, Any

from numaprom.monitoring.metrics import (
PromCounterMetric,
PromInfoMetric,
PromSummaryMetric,
PromGaugeMetric,
)
from numaprom.monitoring.utility import create_metrics_from_config_file
from numaprom.monitoring.utility import get_metric
from omegaconf import OmegaConf

from numalogic import LOGGER
from numalogic._constants import DFAULT_METRICS_CONF_PATH

METRICS_CONFIG_FILE_PATH: Final[str] = os.getenv(
"DEFAULT_METRICS_CONF_PATH", default=DFAULT_METRICS_CONF_PATH
)
def create_metrics_from_config_file(config_file_path: str) -> dict[str, Any]:
config = OmegaConf.load(config_file_path)
metrics = {}
for metric_config in config.get("numalogic_metrics", []):
metric_type = metric_config["type"]
for metric in metric_config["metrics"]:
name = metric["name"]
description = metric.get("description", "")
label_pairs = metric.get("label_pairs", {})
static_label_pairs = metric.get("static_label_pairs", {})
metrics[name] = get_metric(
metric_type, name, description, label_pairs, static_label_pairs
)
return metrics


class MetricsSingleton:
Expand All @@ -26,15 +29,19 @@ def __new__(cls, *args, **kwargs):
cls._instance = super().__new__(cls, *args, **kwargs)
return cls._instance

def load_metrics(self, config_file_path):
def load_metrics(self, config_file_path: str):
if not self._metrics:
if config_file_path is None:
raise ValueError("file path is required to load metrics")
self._metrics = create_metrics_from_config_file(config_file_path)

def get_metrics(self) -> dict:
return self._metrics


# helper functions
def _increment_counter(
counter: PromCounterMetric, labels: Optional[dict], amount: int = 1, is_enabled=True
counter: str, labels: Optional[dict], amount: int = 1, is_enabled=True
) -> None:
"""
Utility function is used to increment the counter.
Expand All @@ -44,11 +51,12 @@ def _increment_counter(
labels: dict of label keys, value pair
amount: Amount to increment the counter by
"""
if is_enabled:
counter.increment_counter(labels=labels, amount=amount)
_metrics = MetricsSingleton().get_metrics()
if is_enabled and counter in _metrics:
_metrics[counter].increment_counter(labels=labels, amount=amount)


def _add_info(info: PromInfoMetric, labels: Optional[dict], data: dict, is_enabled=True) -> None:
def _add_info(info: str, labels: Optional[dict], data: dict, is_enabled=True) -> None:
"""
Utility function is used to add the info.
Expand All @@ -57,13 +65,12 @@ def _add_info(info: PromInfoMetric, labels: Optional[dict], data: dict, is_enabl
labels: dict of label keys, value pair
data: Dictionary of data
"""
if is_enabled:
info.add_info(labels=labels, data=data)
_metrics = MetricsSingleton().get_metrics()
if is_enabled and info in _metrics:
_metrics[info].add_info(labels=labels, data=data)


def _add_summary(
summary: PromSummaryMetric, labels: Optional[dict], data: float, is_enabled=True
) -> None:
def _add_summary(summary: str, labels: Optional[dict], data: float, is_enabled=True) -> None:
"""
Utility function is used to add the summary.
Expand All @@ -72,23 +79,19 @@ def _add_summary(
labels: dict of labels key, value pair
data: Summary value
"""
if is_enabled:
summary.add_observation(labels=labels, value=data)
_metrics = MetricsSingleton().get_metrics()
if is_enabled and summary in _metrics:
_metrics[summary].add_observation(labels=labels, value=data)


def _set_gauge(
gauge: PromGaugeMetric, labels: Optional[dict], data: float, is_enabled=True
) -> None:
def _set_gauge(gauge: str, labels: Optional[dict], data: float, is_enabled=True) -> None:
"""
Utility function is used to add the info.
Args:
gauge: Gauge object
labels: dict of label keys, value pair
data: data.
"""
if is_enabled:
gauge.set_gauge(labels=labels, data=data)


LOGGER.info("Loading metrics from config file: %s", METRICS_CONFIG_FILE_PATH)
_METRICS = MetricsSingleton().load_metrics(METRICS_CONFIG_FILE_PATH)
_metrics = MetricsSingleton().get_metrics()
if is_enabled and gauge in _metrics:
_metrics[gauge].set_gauge(labels=labels, data=data)
8 changes: 4 additions & 4 deletions numalogic/udfs/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from numalogic.udfs._config import PipelineConf

from numalogic.udfs._logger import configure_logger, log_data_payload_values
from numalogic.udfs._metrics_utility import _increment_counter, _METRICS
from numalogic.udfs._metrics_utility import _increment_counter
from numalogic.udfs.entities import StreamPayload, Status
from numalogic.udfs.tools import (
_load_artifact,
Expand Down Expand Up @@ -111,7 +111,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
"pipeline_id": payload.pipeline_id,
}
_increment_counter(
counter=_METRICS["MSG_IN_COUNTER"],
counter="MSG_IN_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
Expand Down Expand Up @@ -144,7 +144,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
_update_info_metric(x_inferred, payload.metrics, _metric_label_values)
except RuntimeError:
_increment_counter(
counter=_METRICS["RUNTIME_ERROR_COUNTER"],
counter="RUNTIME_ERROR_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
Expand Down Expand Up @@ -180,7 +180,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
msgs.append(get_trainer_message(keys, _stream_conf, payload, **_metric_label_values))

_increment_counter(
counter=_METRICS["MSG_PROCESSED_COUNTER"],
counter="MSG_PROCESSED_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
Expand Down
6 changes: 3 additions & 3 deletions numalogic/udfs/payloadtx.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from numalogic.udfs import NumalogicUDF
from numalogic.udfs._config import PipelineConf
from numalogic.udfs._logger import configure_logger, log_data_payload_values
from numalogic.udfs._metrics_utility import _increment_counter, _METRICS
from numalogic.udfs._metrics_utility import _increment_counter

METRICS_ENABLED = os.getenv("METRICS_ENABLED", "True").lower() == "true"

Expand Down Expand Up @@ -65,7 +65,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
"pipeline_id": data_payload["pipeline_id"],
}
_increment_counter(
counter=_METRICS["MSG_IN_COUNTER"],
counter="MSG_IN_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
Expand All @@ -84,7 +84,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
execution_time_ms=round((time.perf_counter() - _start_time) * 1000, 4),
)
_increment_counter(
counter=_METRICS["MSG_PROCESSED_COUNTER"],
counter="MSG_PROCESSED_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
Expand Down
8 changes: 4 additions & 4 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from numalogic.udfs import NumalogicUDF
from numalogic.udfs._config import PipelineConf, MLPipelineConf
from numalogic.udfs._logger import configure_logger, log_data_payload_values
from numalogic.udfs._metrics_utility import _increment_counter, _METRICS
from numalogic.udfs._metrics_utility import _increment_counter
from numalogic.udfs.entities import StreamPayload, Header, Status, OutputPayload
from numalogic.udfs.tools import _load_artifact, get_trainer_message, get_static_thresh_message

Expand Down Expand Up @@ -94,7 +94,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
"pipeline_id": payload.pipeline_id,
}
_increment_counter(
counter=_METRICS["MSG_IN_COUNTER"],
counter="MSG_IN_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
Expand Down Expand Up @@ -157,7 +157,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:

except RuntimeError:
_increment_counter(
_METRICS["RUNTIME_ERROR_COUNTER"], _metric_label_values, is_enabled=METRICS_ENABLED
"RUNTIME_ERROR_COUNTER", _metric_label_values, is_enabled=METRICS_ENABLED
)
logger.exception(
"Runtime postprocess error!",
Expand Down Expand Up @@ -197,7 +197,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
)

_increment_counter(
_METRICS["MSG_PROCESSED_COUNTER"],
"MSG_PROCESSED_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
Expand Down
14 changes: 7 additions & 7 deletions numalogic/udfs/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from numalogic.tools.types import redis_client_t, artifact_t
from numalogic.udfs import NumalogicUDF
from numalogic.udfs._config import PipelineConf
from numalogic.udfs._metrics_utility import _increment_counter, _METRICS
from numalogic.udfs._metrics_utility import _increment_counter
from numalogic.udfs.entities import Status, Header
from numalogic.udfs.tools import (
make_stream_payload,
Expand Down Expand Up @@ -123,20 +123,20 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
}

_increment_counter(
counter=_METRICS["MSG_IN_COUNTER"],
counter="MSG_IN_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
# Drop message if dataframe shape conditions are not met
if raw_df.shape[0] < _stream_conf.window_size or raw_df.shape[1] != len(_conf.metrics):
logger.critical("Dataframe shape conditions not met ", raw_df_shape=raw_df.shape)
_increment_counter(
counter=_METRICS["DATASHAPE_ERROR_COUNTER"],
counter="DATASHAPE_ERROR_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
_increment_counter(
counter=_METRICS["MSG_DROPPED_COUNTER"],
counter="MSG_DROPPED_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
Expand Down Expand Up @@ -174,7 +174,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
else:
# Load configuration for the config_id
_increment_counter(
_METRICS["SOURCE_COUNTER"],
"SOURCE_COUNTER",
labels=({"artifact_source": "config"} | _metric_label_values),
is_enabled=METRICS_ENABLED,
)
Expand Down Expand Up @@ -204,7 +204,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
)
except RuntimeError:
_increment_counter(
counter=_METRICS["RUNTIME_ERROR_COUNTER"],
counter="RUNTIME_ERROR_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
Expand All @@ -227,7 +227,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
return msgs

_increment_counter(
counter=_METRICS["MSG_PROCESSED_COUNTER"],
counter="MSG_PROCESSED_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
Expand Down
8 changes: 4 additions & 4 deletions numalogic/udfs/staticthresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import numpy.typing as npt

from numalogic.udfs._logger import configure_logger, log_data_payload_values
from numalogic.udfs._metrics_utility import _increment_counter, _METRICS
from numalogic.udfs._metrics_utility import _increment_counter
from numalogic.udfs.entities import StreamPayload, OutputPayload

METRICS_ENABLED = os.getenv("METRICS_ENABLED", "True").lower() == "true"
Expand Down Expand Up @@ -54,7 +54,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
"pipeline_id": payload.pipeline_id,
}
_increment_counter(
counter=_METRICS["MSG_IN_COUNTER"],
counter="MSG_IN_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
Expand All @@ -74,7 +74,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
except RuntimeError:
logger.exception("Error occurred while computing static anomaly scores")
_increment_counter(
_METRICS["RUNTIME_ERROR_COUNTER"], _metric_label_values, is_enabled=METRICS_ENABLED
"RUNTIME_ERROR_COUNTER", _metric_label_values, is_enabled=METRICS_ENABLED
)
return Messages(Message.to_drop())

Expand All @@ -95,7 +95,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
y_features=y_features,
)
_increment_counter(
counter=_METRICS["MSG_PROCESSED_COUNTER"],
counter="MSG_PROCESSED_COUNTER",
labels=_metric_label_values,
is_enabled=METRICS_ENABLED,
)
Expand Down
Loading

0 comments on commit 47f0ebd

Please sign in to comment.