diff --git a/README.md b/README.md index 0eeab84..3260516 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,6 @@ -# PyUdisk (Linux) - -PyUdisk is a wrapper around the `udisksctl` command-line tool to generate a comprehensive list of all block devices and drives on a Linux machines. +# PyUdisk +PyUdisk is a python module to generate a S.M.A.R.T metrics for all drives/partitions on macOS and Linux machines. ### Installation @@ -46,7 +45,7 @@ pyudisk start > _By default, `PyUdisk` will look for a `.env` file in the current working directory._ -- **UDISK_LIB**: Path to the `udisksctl` command-line tool. Default: `/usr/bin/udisksctl` +- **SMART_LIB**: Path to the `udisksctl` command-line tool. Default: `/usr/bin/udisksctl` - **METRICS**: List of metrics to monitor. Default: `[]` - **GMAIL_USER**: Gmail username to authenticate SMTP library. - **GMAIL_PASS**: Gmail password to authenticate SMTP library. diff --git a/pyudisk/__init__.py b/pyudisk/__init__.py index 5613357..1ea47ae 100644 --- a/pyudisk/__init__.py +++ b/pyudisk/__init__.py @@ -7,7 +7,7 @@ from .main import EnvConfig, generate_report, monitor, smart_metrics # noqa: F401 -version = "0.2.0a1" +version = "0.2.0" @click.command() diff --git a/pyudisk/main.py b/pyudisk/main.py index dc5d115..a45e9e3 100644 --- a/pyudisk/main.py +++ b/pyudisk/main.py @@ -17,8 +17,6 @@ from .support import humanize_usage_metrics, load_dump, load_partitions from .util import standard -# todo: Extend usage for Darwin OS to monitor - def get_disk(env: EnvConfig) -> Generator[sdiskpart]: """Gathers disk information using the 'psutil' library. @@ -287,42 +285,6 @@ def smart_metrics(env: EnvConfig) -> Generator[linux.Disk | darwin.Disk]: yield linux.Disk(id=drive, model=data.get("Info", {}).get("Model", ""), **data) -def monitor_disk(env: EnvConfig) -> Generator[linux.Disk]: - """Monitors disk attributes based on the configuration. - - Args: - env: Environment variables configuration. - - Yields: - Disk: - Data structure parsed as a Disk object. - """ - message = "" - for disk in smart_metrics(env): - if disk.Attributes: - for metric in env.metrics: - attribute = disk.Attributes.model_dump().get(metric.attribute) - if metric.max_threshold and attribute >= metric.max_threshold: - msg = f"{metric.attribute!r} for {disk.id!r} is >= {metric.max_threshold} at {attribute}" - LOGGER.critical(msg) - message += msg + "\n" - if metric.min_threshold and attribute <= metric.min_threshold: - msg = f"{metric.attribute!r} for {disk.id!r} is <= {metric.min_threshold} at {attribute}" - LOGGER.critical(msg) - message += msg + "\n" - if metric.equal_match and attribute != metric.equal_match: - msg = f"{metric.attribute!r} for {disk.id!r} IS NOT {metric.equal_match} at {attribute}" - LOGGER.critical(msg) - message += msg + "\n" - else: - LOGGER.warning("No attributes were loaded for %s", disk.model) - yield disk - if message: - notification_service( - title="Disk Monitor Alert!!", message=message, env_config=env - ) - - def generate_html( data: List[Dict[str, str | int | float | bool]], filepath: NewPath = None ) -> str | NoReturn: @@ -388,12 +350,50 @@ def generate_report(**kwargs) -> str: return report_file +def monitor_disk(env: EnvConfig) -> Generator[linux.Disk]: + """Monitors disk attributes based on the configuration. + + Args: + env: Environment variables configuration. + + Yields: + Disk: + Data structure parsed as a Disk object. + """ + assert OPERATING_SYSTEM == OperationSystem.linux, "Monitoring feature is available only for Linux machines!!" + message = "" + for disk in smart_metrics(env): + if disk.Attributes: + for metric in env.metrics: + attribute = disk.Attributes.model_dump().get(metric.attribute) + if metric.max_threshold and attribute >= metric.max_threshold: + msg = f"{metric.attribute!r} for {disk.id!r} is >= {metric.max_threshold} at {attribute}" + LOGGER.critical(msg) + message += msg + "\n" + if metric.min_threshold and attribute <= metric.min_threshold: + msg = f"{metric.attribute!r} for {disk.id!r} is <= {metric.min_threshold} at {attribute}" + LOGGER.critical(msg) + message += msg + "\n" + if metric.equal_match and attribute != metric.equal_match: + msg = f"{metric.attribute!r} for {disk.id!r} IS NOT {metric.equal_match} at {attribute}" + LOGGER.critical(msg) + message += msg + "\n" + else: + LOGGER.warning("No attributes were loaded for %s", disk.model) + yield disk + if message: + notification_service( + title="Disk Monitor Alert!!", message=message, env_config=env + ) + + def monitor(**kwargs) -> None: """Entrypoint for the disk monitoring service. Args: **kwargs: Arbitrary keyword arguments. """ + assert OPERATING_SYSTEM == OperationSystem.linux, "Monitoring feature is available only for Linux machines!!" env = EnvConfig(**kwargs) disk_report = [disk.model_dump() for disk in monitor_disk(env)] if disk_report: