From 99124ff8b1937ccfe6e42ea4e6219eee56a88d35 Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Mon, 2 Dec 2024 20:13:25 +0100 Subject: [PATCH 1/4] #26: Changed dataframe format to PySparkDataFrame Signed-off-by: Dominik Hoffmann --- .../spark/dataframe/dataframe_log_handler.py | 44 ++++++++++++------- .../logging/spark/runtime_log_collector.py | 30 ++++++------- .../pipelines/logging/test_log_collection.py | 11 ++--- 3 files changed, 47 insertions(+), 38 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py index cb8af94fc..096e87615 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py +++ b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py @@ -13,14 +13,17 @@ # limitations under the License. import logging -import pandas -from pandas import DataFrame +from pyspark.sql import DataFrame as PySparkDataFrame from datetime import datetime +from pyspark.sql.connect.session import SparkSession +from pyspark.sql.types import StructField, TimestampType, StringType, StructType, Row + class DataFrameLogHandler(logging.Handler): """ Handles logs from attached logger and stores them in a DataFrame at runtime + Uses the following format: {Timestamp, Logger Name, Logging Level, Log Message} Args: logging.Handler: Inherits from logging.Handler @@ -37,25 +40,34 @@ class DataFrameLogHandler(logging.Handler): """ - logs_df: DataFrame = None + logs_df: PySparkDataFrame = None + spark: SparkSession + + def __init__(self, spark: SparkSession): + self.spark = spark + schema = StructType( + [ + StructField("timestamp", TimestampType(), True), + StructField("name", StringType(), True), + StructField("level", StringType(), True), + StructField("message", StringType(), True), + ] + ) - def __init__(self): - self.logs_df = DataFrame(columns=["timestamp", "name", "level", "message"]) + self.logs_df = self.spark.createDataFrame([], schema) + print(self.logs_df) super().__init__() def emit(self, record: logging.LogRecord) -> None: """Process and store a log record""" - log_entry = { - "timestamp": datetime.fromtimestamp(record.created), - "name": record.name, - "level": record.levelname, - "message": record.msg, - } - - new_log_df_row = pandas.DataFrame( - log_entry, columns=["timestamp", "name", "level", "message"], index=[0] + new_log_entry = Row( + timestamp=datetime.fromtimestamp(record.created), + name=record.name, + level=record.levelname, + message=record.msg, ) - self.logs_df = pandas.concat([self.logs_df, new_log_df_row], ignore_index=True) - def get_logs_as_df(self) -> DataFrame: + self.logs_df = self.logs_df.union(self.spark.createDataFrame([new_log_entry])) + + def get_logs_as_df(self) -> PySparkDataFrame: return self.logs_df diff --git a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/runtime_log_collector.py b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/runtime_log_collector.py index eb0091761..ae75079c8 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/runtime_log_collector.py +++ b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/runtime_log_collector.py @@ -1,7 +1,7 @@ import os -from pandas import DataFrame -from pandas.io.common import file_path_to_url +from pyspark.sql import DataFrame as PySparkDataFrame +from pyspark.sql.connect.session import SparkSession from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import ( Libraries, @@ -21,15 +21,14 @@ class RuntimeLogCollector(LoggingBaseInterface): """Collects logs from all loggers in the LoggerManager at runtime.""" logger_manager: LoggerManager = LoggerManager() - df_handler: DataFrameLogHandler = DataFrameLogHandler() + df_handler: DataFrameLogHandler - def __init__(self): - pass + def __init__(self, spark: SparkSession): + self.df_handler = DataFrameLogHandler(spark) - @classmethod - def get_logs_as_df(cls) -> DataFrame: + def get_logs_as_df(self) -> PySparkDataFrame: """Return the DataFrame containing the logs""" - return cls.df_handler.get_logs_as_df() + return self.df_handler.get_logs_as_df() @staticmethod def libraries(): @@ -44,24 +43,21 @@ def settings() -> dict: def system_type() -> SystemType: pass - @classmethod - def _attach_dataframe_handler_to_loggers(cls) -> None: + def _attach_dataframe_handler_to_loggers(self) -> None: """Attaches the DataFrameLogHandler to the logger.""" - - loggers = cls.logger_manager.get_all_loggers() + loggers = self.logger_manager.get_all_loggers() for logger in loggers.values(): # avoid duplicate handlers - if cls.df_handler not in logger.handlers: - logger.addHandler(cls.df_handler) + if self.df_handler not in logger.handlers: + logger.addHandler(self.df_handler) - @classmethod def _attach_file_handler_to_loggers( - cls, filename: str, path: str = ".", mode: str = "a" + self, filename: str, path: str = ".", mode: str = "a" ) -> None: """Attaches the FileLogHandler to the logger.""" - loggers = cls.logger_manager.get_all_loggers() + loggers = self.logger_manager.get_all_loggers() file_path = os.path.join(path, filename) file_handler = FileLogHandler(file_path, mode) for logger in loggers.values(): diff --git a/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py b/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py index f6a4adacf..45fdd971f 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py @@ -38,20 +38,21 @@ def spark(): spark.stop() -def test_logger_manager_basic_function(): +def test_logger_manager_basic_function(spark): df = DataFrame() monitor = IdentifyMissingDataInterval( df=df, interval="10s", tolerance="500ms", ) - log_collector = RuntimeLogCollector() + log_collector = RuntimeLogCollector(spark) assert monitor.logger_manager is log_collector.logger_manager def test_df_output(spark, caplog): - log_collector = RuntimeLogCollector() + + log_collector = RuntimeLogCollector(spark) data = [ (1, "2024-02-11 00:00:00.000"), (2, "2024-02-11 00:00:10.000"), @@ -78,12 +79,12 @@ def test_df_output(spark, caplog): result_df = log_collector.get_logs_as_df() - assert result_df.shape[0] == 6 + assert result_df.count() == 6 def test_file_logging(spark, caplog): - log_collector = RuntimeLogCollector() + log_collector = RuntimeLogCollector(spark) data = [ (1, "2024-02-11 00:00:00.000"), (2, "2024-02-11 00:00:10.000"), From 41c4eb3e1d53e99495d089c725aff61a367e7d89 Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Mon, 2 Dec 2024 20:50:48 +0100 Subject: [PATCH 2/4] #26: Fixed that for each logger a separate df is returned Signed-off-by: Dominik Hoffmann --- .../rtdip_sdk/pipelines/logging/interfaces.py | 2 +- .../spark/dataframe/dataframe_log_handler.py | 1 - .../logging/spark/runtime_log_collector.py | 29 +++++++++---------- .../pipelines/logging/test_log_collection.py | 8 +++-- 4 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/logging/interfaces.py b/src/sdk/python/rtdip_sdk/pipelines/logging/interfaces.py index ff6a62dcc..9d4aae446 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/logging/interfaces.py +++ b/src/sdk/python/rtdip_sdk/pipelines/logging/interfaces.py @@ -20,5 +20,5 @@ class LoggingBaseInterface(PipelineComponentBaseInterface): @abstractmethod - def get_logs_as_df(self) -> DataFrame: + def get_logs_as_df(self, logger_name: str) -> DataFrame: pass diff --git a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py index 096e87615..be0718c96 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py +++ b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py @@ -55,7 +55,6 @@ def __init__(self, spark: SparkSession): ) self.logs_df = self.spark.createDataFrame([], schema) - print(self.logs_df) super().__init__() def emit(self, record: logging.LogRecord) -> None: diff --git a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/runtime_log_collector.py b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/runtime_log_collector.py index ae75079c8..330a07431 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/runtime_log_collector.py +++ b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/runtime_log_collector.py @@ -17,18 +17,15 @@ ) -class RuntimeLogCollector(LoggingBaseInterface): +class RuntimeLogCollector: """Collects logs from all loggers in the LoggerManager at runtime.""" logger_manager: LoggerManager = LoggerManager() - df_handler: DataFrameLogHandler - def __init__(self, spark: SparkSession): - self.df_handler = DataFrameLogHandler(spark) + spark: SparkSession - def get_logs_as_df(self) -> PySparkDataFrame: - """Return the DataFrame containing the logs""" - return self.df_handler.get_logs_as_df() + def __init__(self, spark: SparkSession): + self.spark = spark @staticmethod def libraries(): @@ -43,14 +40,16 @@ def settings() -> dict: def system_type() -> SystemType: pass - def _attach_dataframe_handler_to_loggers(self) -> None: - """Attaches the DataFrameLogHandler to the logger.""" - loggers = self.logger_manager.get_all_loggers() - - for logger in loggers.values(): - # avoid duplicate handlers - if self.df_handler not in logger.handlers: - logger.addHandler(self.df_handler) + def _attach_dataframe_handler_to_logger( + self, logger_name: str + ) -> DataFrameLogHandler: + """Attaches the DataFrameLogHandler to the logger. Returns True if the handler was attached, False otherwise.""" + logger = self.logger_manager.get_logger(logger_name) + df_log_handler = DataFrameLogHandler(self.spark) + if logger is not None: + if df_log_handler not in logger.handlers: + logger.addHandler(df_log_handler) + return df_log_handler def _attach_file_handler_to_loggers( self, filename: str, path: str = ".", mode: str = "a" diff --git a/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py b/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py index 45fdd971f..24b72e43b 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py @@ -14,6 +14,7 @@ import os import pytest + from pandas import DataFrame from pyspark.sql import SparkSession @@ -67,17 +68,20 @@ def test_df_output(spark, caplog): ] columns = ["Index", "EventTime"] df = spark.createDataFrame(data, schema=columns) + monitor = IdentifyMissingDataInterval( df=df, interval="10s", tolerance="500ms", ) - log_collector._attach_dataframe_handler_to_loggers() + log_handler = log_collector._attach_dataframe_handler_to_logger( + "IdentifyMissingDataInterval" + ) with caplog.at_level(logging.INFO, logger="IdentifyMissingDataInterval"): monitor.check() - result_df = log_collector.get_logs_as_df() + result_df = log_handler.get_logs_as_df() assert result_df.count() == 6 From 816d6437f8fbf76e8acde34251ee6e88b7eea2da Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Mon, 2 Dec 2024 20:55:14 +0100 Subject: [PATCH 3/4] #26: Fixed test case that for each logger a separate df is returned Signed-off-by: Dominik Hoffmann --- .../pipelines/logging/test_log_collection.py | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py b/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py index 24b72e43b..27bbff1fb 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/logging/test_log_collection.py @@ -18,6 +18,7 @@ from pandas import DataFrame from pyspark.sql import SparkSession +from src.sdk.python.rtdip_sdk.pipelines.logging.logger_manager import LoggerManager from src.sdk.python.rtdip_sdk.pipelines.logging.spark.runtime_log_collector import ( RuntimeLogCollector, ) @@ -52,7 +53,6 @@ def test_logger_manager_basic_function(spark): def test_df_output(spark, caplog): - log_collector = RuntimeLogCollector(spark) data = [ (1, "2024-02-11 00:00:00.000"), @@ -86,6 +86,43 @@ def test_df_output(spark, caplog): assert result_df.count() == 6 +def test_unique_dataframes(spark, caplog): + log_collector = RuntimeLogCollector(spark) + data = [ + (1, "2024-02-11 00:00:00.000"), + (2, "2024-02-11 00:00:10.000"), + (3, "2024-02-11 00:00:20.000"), + (4, "2024-02-11 00:00:36.000"), # Missing interval (20s to 36s) + (5, "2024-02-11 00:00:45.000"), + (6, "2024-02-11 00:00:55.000"), + (7, "2024-02-11 00:01:05.000"), + (8, "2024-02-11 00:01:15.000"), + (9, "2024-02-11 00:01:25.000"), + (10, "2024-02-11 00:01:41.000"), # Missing interval (25s to 41s) + ] + columns = ["Index", "EventTime"] + df = spark.createDataFrame(data, schema=columns) + logger = LoggerManager().create_logger("Test_Logger") + monitor = IdentifyMissingDataInterval( + df=df, + interval="10s", + tolerance="500ms", + ) + log_handler_identify_missing_data_interval = ( + log_collector._attach_dataframe_handler_to_logger("IdentifyMissingDataInterval") + ) + + log_handler_test = log_collector._attach_dataframe_handler_to_logger("Test_Logger") + + with caplog.at_level(logging.INFO, logger="IdentifyMissingDataInterval"): + monitor.check() + + result_df = log_handler_identify_missing_data_interval.get_logs_as_df() + result_df_test = log_handler_test.get_logs_as_df() + + assert result_df.count() != result_df_test.count() + + def test_file_logging(spark, caplog): log_collector = RuntimeLogCollector(spark) From a429f761955d2defdb2b374fd22c42f5ede3ed4c Mon Sep 17 00:00:00 2001 From: Dominik Hoffmann Date: Mon, 2 Dec 2024 21:01:40 +0100 Subject: [PATCH 4/4] #26: Fixed imports Signed-off-by: Dominik Hoffmann --- .../logging/spark/dataframe/dataframe_log_handler.py | 4 ++-- .../pipelines/logging/spark/log_file/file_log_handler.py | 2 +- .../pipelines/logging/spark/runtime_log_collector.py | 5 ++--- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py index be0718c96..4d9c15f40 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py +++ b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/dataframe/dataframe_log_handler.py @@ -13,10 +13,10 @@ # limitations under the License. import logging -from pyspark.sql import DataFrame as PySparkDataFrame +from pyspark.sql import DataFrame as PySparkDataFrame, SparkSession from datetime import datetime -from pyspark.sql.connect.session import SparkSession + from pyspark.sql.types import StructField, TimestampType, StringType, StructType, Row diff --git a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/log_file/file_log_handler.py b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/log_file/file_log_handler.py index 1c1158023..9b9e29e72 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/log_file/file_log_handler.py +++ b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/log_file/file_log_handler.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -import pandas + from pandas import DataFrame from datetime import datetime diff --git a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/runtime_log_collector.py b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/runtime_log_collector.py index 330a07431..712e8ec4c 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/logging/spark/runtime_log_collector.py +++ b/src/sdk/python/rtdip_sdk/pipelines/logging/spark/runtime_log_collector.py @@ -1,13 +1,12 @@ import os -from pyspark.sql import DataFrame as PySparkDataFrame -from pyspark.sql.connect.session import SparkSession +from pyspark.sql import SparkSession from src.sdk.python.rtdip_sdk.pipelines._pipeline_utils.models import ( Libraries, SystemType, ) -from src.sdk.python.rtdip_sdk.pipelines.logging.interfaces import LoggingBaseInterface + from src.sdk.python.rtdip_sdk.pipelines.logging.logger_manager import LoggerManager from src.sdk.python.rtdip_sdk.pipelines.logging.spark.dataframe.dataframe_log_handler import ( DataFrameLogHandler,