From 5b5e6c7a4d046fe6f59070ed111fa5d258134e3e Mon Sep 17 00:00:00 2001 From: MikhailBurdukov <102754618+MikhailBurdukov@users.noreply.github.com> Date: Wed, 27 Sep 2023 13:59:03 +0300 Subject: [PATCH] Loguru for logging. (#71) * init * More * fix kazoo * Fixes * ch_backup/logging.py * Changes * Review changes * Fixes --- ch_backup/backup/deduplication.py | 6 +- ch_backup/backup/layout.py | 32 ++--- ch_backup/ch_backup.py | 12 +- ch_backup/cli.py | 8 +- ch_backup/clickhouse/client.py | 2 +- ch_backup/clickhouse/control.py | 6 +- ch_backup/clickhouse/disks.py | 22 ++-- ch_backup/config.py | 71 ++++------- ch_backup/logging.py | 116 +++++++++++++++--- ch_backup/logic/database.py | 4 +- ch_backup/logic/table.py | 38 +++--- ch_backup/logic/udf.py | 8 +- .../async_pipeline/base_pipeline/exec_pool.py | 6 +- ch_backup/storage/pipeline.py | 4 +- ch_backup/util.py | 2 +- ch_backup/zookeeper/zookeeper.py | 11 +- requirements.txt | 1 + 17 files changed, 202 insertions(+), 147 deletions(-) diff --git a/ch_backup/backup/deduplication.py b/ch_backup/backup/deduplication.py index e50010a0..84ff904b 100644 --- a/ch_backup/backup/deduplication.py +++ b/ch_backup/backup/deduplication.py @@ -229,7 +229,7 @@ def deduplicate_part( """ part_name = fpart.name - logging.debug('Looking for deduplication of part "%s"', part_name) + logging.debug('Looking for deduplication of part "{}"', part_name) existing_part = dedup_info.get(part_name) if not existing_part: @@ -253,14 +253,14 @@ def deduplicate_part( if not existing_part.verified: if not layout.check_data_part(existing_part.backup_path, part): logging.debug( - 'Part "%s" found in "%s", but it\'s invalid, skipping', + 'Part "{}" found in "{}", but it\'s invalid, skipping', part_name, existing_part.backup_path, ) return None logging.debug( - 'Part "%s" found in "%s", reusing', part_name, existing_part.backup_path + 'Part "{}" found in "{}", reusing', part_name, existing_part.backup_path ) return part diff --git a/ch_backup/backup/layout.py b/ch_backup/backup/layout.py index 99f850b1..22490a44 100644 --- a/ch_backup/backup/layout.py +++ b/ch_backup/backup/layout.py @@ -45,11 +45,11 @@ def upload_backup_metadata(self, backup: BackupMetadata) -> None: remote_path = self._backup_metadata_path(backup.name) remote_light_path = self._backup_light_metadata_path(backup.name) try: - logging.debug("Saving backup metadata in %s", remote_path) + logging.debug("Saving backup metadata in {}", remote_path) self._storage_loader.upload_data( backup.dump_json(light=False), remote_path=remote_path ) - logging.debug("Saving backup light metadata in %s", remote_light_path) + logging.debug("Saving backup light metadata in {}", remote_light_path) self._storage_loader.upload_data( backup.dump_json(light=True), remote_path=remote_light_path ) @@ -66,7 +66,7 @@ def upload_database_create_statement(self, backup_name: str, db: Database) -> No remote_path = _db_metadata_path(self.get_backup_path(backup_name), db.name) try: logging.debug( - 'Uploading metadata (create statement) for database "%s"', db.name + 'Uploading metadata (create statement) for database "{}"', db.name ) self._storage_loader.upload_file( local_path, remote_path=remote_path, encryption=True @@ -88,7 +88,7 @@ def upload_table_create_statement( ) try: logging.debug( - 'Uploading metadata (create statement) for table "%s"."%s"', + 'Uploading metadata (create statement) for table "{}"."{}"', db.name, table.name, ) @@ -108,7 +108,7 @@ def upload_access_control_file(self, backup_name: str, file_name: str) -> None: self.get_backup_path(backup_name), file_name ) try: - logging.debug('Uploading access control data "%s"', local_path) + logging.debug('Uploading access control data "{}"', local_path) self._storage_loader.upload_file( local_path=local_path, remote_path=remote_path, encryption=True ) @@ -127,7 +127,7 @@ def upload_access_control_files( self.get_backup_path(backup_name), ACCESS_CONTROL_FNAME ) try: - logging.debug('Uploading access control data "%s"', local_path) + logging.debug('Uploading access control data "{}"', local_path) self._storage_loader.upload_files_tarball( self._access_control_path, file_names, remote_path, encryption=True ) @@ -154,7 +154,7 @@ def upload_data_part(self, backup_name: str, fpart: FrozenPart) -> None: Upload part data. """ logging.debug( - 'Uploading data part %s of "%s"."%s"', + 'Uploading data part {} of "{}"."{}"', fpart.name, fpart.database, fpart.table, @@ -255,7 +255,7 @@ def get_backups(self, use_light_meta: bool = False) -> List[BackupMetadata]: Return list of existing backups sorted by start_time in descent order. """ logging.debug( - "Collecting %s of existing backups", + "Collecting {} of existing backups", "light metadata" if use_light_meta else "metadata", ) @@ -322,7 +322,7 @@ def download_access_control_file(self, backup_name: str, file_name: str) -> None ) local_path = os.path.join(self._access_control_path, file_name) logging.debug( - 'Downloading access control metadata "%s" to "%s', remote_path, local_path + 'Downloading access control metadata "{}" to "{}', remote_path, local_path ) try: self._storage_loader.download_file(remote_path, local_path, encryption=True) @@ -339,7 +339,7 @@ def download_access_control(self, backup_name: str) -> None: ) local_path = self._access_control_path logging.debug( - 'Downloading access control metadata "%s" to "%s', remote_path, local_path + 'Downloading access control metadata "{}" to "{}', remote_path, local_path ) try: self._storage_loader.download_files( @@ -356,7 +356,7 @@ def download_data_part( Download part data to the specified directory. """ logging.debug( - 'Downloading data part %s of "%s"."%s"', + 'Downloading data part {} of "{}"."{}"', part.name, part.database, part.table, @@ -370,7 +370,7 @@ def download_data_part( if part.tarball: remote_path = os.path.join(remote_dir_path, f"{part.name}.tar") - logging.debug("Downloading part tarball file: %s", remote_path) + logging.debug("Downloading part tarball file: {}", remote_path) try: self._storage_loader.download_files( remote_path=remote_path, @@ -386,7 +386,7 @@ def download_data_part( local_path = os.path.join(fs_part_path, filename) remote_path = os.path.join(remote_dir_path, filename) try: - logging.debug("Downloading part file: %s", remote_path) + logging.debug("Downloading part file: {}", remote_path) self._storage_loader.download_file( remote_path=remote_path, local_path=local_path, @@ -422,7 +422,7 @@ def check_data_part(self, backup_path: str, part: PartMetadata) -> bool: notfound_files = set(part.files) - set(remote_files) if notfound_files: logging.warning( - "Some part files were not found in %s: %s", + "Some part files were not found in {}: {}", remote_dir_path, ", ".join(notfound_files), ) @@ -469,7 +469,7 @@ def delete_backup(self, backup_name: str) -> None: """ backup_path = self.get_backup_path(backup_name) - logging.debug("Deleting data in %s", backup_path) + logging.debug("Deleting data in {}", backup_path) deleting_files = self._storage_loader.list_dir( backup_path, recursive=True, absolute=True @@ -490,7 +490,7 @@ def delete_data_parts( part_path = _part_path( part.link or backup_meta.path, part.database, part.table, part.name ) - logging.debug("Deleting data part %s", part_path) + logging.debug("Deleting data part {}", part_path) if part.tarball: deleting_files.append(os.path.join(part_path, f"{part.name}.tar")) else: diff --git a/ch_backup/ch_backup.py b/ch_backup/ch_backup.py index 713ace20..de63e05e 100644 --- a/ch_backup/ch_backup.py +++ b/ch_backup/ch_backup.py @@ -165,7 +165,7 @@ def backup( self._context.backup_meta ) logging.debug( - 'Starting backup "%s" for databases: %s', + 'Starting backup "{}" for databases: {}', self._context.backup_meta.name, ", ".join(map(lambda db: db.name, databases)), ) @@ -273,7 +273,7 @@ def restore( ] if missed_databases: logging.critical( - "Required databases %s were not found in backup metadata: %s", + "Required databases {} were not found in backup metadata: {}", ", ".join(missed_databases), self._context.backup_meta.path, ) @@ -360,13 +360,13 @@ def purge(self) -> Tuple[Sequence[str], Optional[str]]: with self._context.locker(): for backup in self._context.backup_layout.get_backups(use_light_meta=False): if backup.name not in backup_names: - logging.info("Deleting backup without metadata: %s", backup.name) + logging.info("Deleting backup without metadata: {}", backup.name) self._context.backup_layout.delete_backup(backup.name) continue if retain_count > 0: logging.info( - "Preserving backup per retain count policy: %s, state %s", + "Preserving backup per retain count policy: {}, state {}", backup.name, backup.state, ) @@ -377,7 +377,7 @@ def purge(self) -> Tuple[Sequence[str], Optional[str]]: if retain_time_limit and backup.start_time >= retain_time_limit: logging.info( - "Preserving backup per retain time policy: %s, state %s", + "Preserving backup per retain time policy: {}, state {}", backup.name, backup.state, ) @@ -413,7 +413,7 @@ def _delete( self, backup: BackupMetadata, dedup_references: DedupReferences ) -> Tuple[Optional[str], Optional[str]]: logging.info( - "Deleting backup %s, state: %s", + "Deleting backup {}, state: {}", backup.name, backup.state, ) diff --git a/ch_backup/cli.py b/ch_backup/cli.py index 17f33dc2..4cad8872 100755 --- a/ch_backup/cli.py +++ b/ch_backup/cli.py @@ -127,7 +127,7 @@ def cli( if zk_hosts is not None: cfg["zookeeper"]["hosts"] = zk_hosts - logging.configure(cfg["logging"]) + logging.configure(cfg["loguru"]) setup_environment(cfg["main"]) if not drop_privileges(cfg["main"]): @@ -149,7 +149,7 @@ def decorator(f): def wrapper(ctx, *args, **kwargs): try: logging.info( - "Executing command '%s', params: %s, args: %s, version: %s", + "Executing command '{}', params: {}, args: {}, version: {}", ctx.command.name, { **ctx.parent.params, @@ -159,10 +159,10 @@ def wrapper(ctx, *args, **kwargs): get_version(), ) result = ctx.invoke(f, ctx, ctx.obj["backup"], *args, **kwargs) - logging.info("Command '%s' completed", ctx.command.name) + logging.info("Command '{}' completed", ctx.command.name) return result except (Exception, TerminatingSignal): - logging.exception("Command '%s' failed", ctx.command.name) + logging.exception("Command '{}' failed", ctx.command.name) raise return cli.command(*args, **kwargs)(wrapper) diff --git a/ch_backup/clickhouse/client.py b/ch_backup/clickhouse/client.py index d292e2dc..aeded6d5 100644 --- a/ch_backup/clickhouse/client.py +++ b/ch_backup/clickhouse/client.py @@ -48,7 +48,7 @@ def query( Execute query. """ try: - logging.debug("Executing query: %s", query) + logging.debug("Executing query: {}", query) if timeout is None: timeout = self.timeout diff --git a/ch_backup/clickhouse/control.py b/ch_backup/clickhouse/control.py index 57f8b756..a0ab8bdd 100644 --- a/ch_backup/clickhouse/control.py +++ b/ch_backup/clickhouse/control.py @@ -400,14 +400,14 @@ def remove_freezed_data(self) -> None: for disk in self._disks.values(): if disk.type == "local": shadow_path = os.path.join(disk.path, "shadow") - logging.debug("Removing shadow data: %s", shadow_path) + logging.debug("Removing shadow data: {}", shadow_path) self._remove_shadow_data(shadow_path) def remove_freezed_part(self, part: FrozenPart) -> None: """ Remove the freezed part. """ - logging.debug("Removing freezed part: %s", part.path) + logging.debug("Removing freezed part: {}", part.path) self._remove_shadow_data(part.path) def get_databases( @@ -628,7 +628,7 @@ def list_frozen_parts( path = os.path.join(disk.path, "shadow", backup_name, table_relative_path) if not os.path.exists(path): - logging.debug("Shadow path %s is empty", path) + logging.debug("Shadow path {} is empty", path) return [] freezed_parts: List[FrozenPart] = [] diff --git a/ch_backup/clickhouse/disks.py b/ch_backup/clickhouse/disks.py index be157b77..c2a4e6ee 100644 --- a/ch_backup/clickhouse/disks.py +++ b/ch_backup/clickhouse/disks.py @@ -2,7 +2,7 @@ Clickhouse-disks controls temporary cloud storage disks management. """ -import logging + import os from subprocess import PIPE, Popen from typing import Dict, List, Optional @@ -10,7 +10,7 @@ import xmltodict -import ch_backup.logging as ch_logging +from ch_backup import logging from ch_backup.backup.layout import BackupLayout from ch_backup.backup.metadata import BackupMetadata, PartMetadata from ch_backup.clickhouse.config import ClickhouseConfig @@ -78,13 +78,13 @@ def __enter__(self): def __exit__(self, exc_type, *args, **kwargs): if exc_type is not None: - ch_logging.warning( + logging.warning( f'Omitting tmp cloud storage disk cleanup due to exception: "{exc_type}"' ) return False for disk in self._created_disks.values(): - ch_logging.debug(f"Removing tmp disk {disk.name}") + logging.debug(f"Removing tmp disk {disk.name}") try: os.remove(_get_config_path(self._config_dir, disk.name)) return True @@ -100,7 +100,7 @@ def _create_temporary_disk( source_endpoint: str, ) -> None: tmp_disk_name = _get_tmp_disk_name(disk.name) - ch_logging.debug(f"Creating tmp disk {tmp_disk_name}") + logging.debug(f"Creating tmp disk {tmp_disk_name}") disk_config = self._ch_config.config["storage_configuration"]["disks"][ disk.name ] @@ -129,9 +129,7 @@ def _create_temporary_disk( self._ch_ctl.reload_config() source_disk = self._ch_ctl.get_disk(tmp_disk_name) - ch_logging.debug( - f'Restoring Cloud Storage "shadow" data of disk "{disk.name}"' - ) + logging.debug(f'Restoring Cloud Storage "shadow" data of disk "{disk.name}"') self._backup_layout.download_cloud_storage_metadata( backup_meta, source_disk, disk.name ) @@ -180,23 +178,23 @@ def _copy_dir(from_disk: str, from_path: str, to_disk: str, to_path: str) -> Non common_args=[], command_args=["--diskFrom", from_disk, "--diskTo", to_disk, from_path, to_path], ) - ch_logging.warning(f"clickhouse-disks copy result: {os.linesep.join(result)}") + logging.warning(f"clickhouse-disks copy result: {os.linesep.join(result)}") def _exec(command: str, common_args: List[str], command_args: List[str]) -> List[str]: - logger = logging.getLogger("clickhouse-disks") + ch_disks_logger = logging.getLogger("clickhouse-disks") command_args = [ "/usr/bin/clickhouse-disks", *common_args, command, *command_args, ] - ch_logging.debug(f'Executing "{" ".join(command_args)}"') + logging.debug(f'Executing "{" ".join(command_args)}"') with Popen(command_args, stdout=PIPE, stderr=PIPE, shell=False) as proc: while proc.poll() is None: for line in proc.stderr.readlines(): # type: ignore - logger.info(line.decode("utf-8").strip()) + ch_disks_logger.info(line.decode("utf-8").strip()) if proc.returncode != 0: raise ClickHouseDisksException( f"clickhouse-disks call failed with exitcode: {proc.returncode}" diff --git a/ch_backup/config.py b/ch_backup/config.py index 20c5b1d4..7d1b6e6e 100644 --- a/ch_backup/config.py +++ b/ch_backup/config.py @@ -8,8 +8,7 @@ import yaml from humanfriendly import parse_size, parse_timespan - -from ch_backup import logging +from loguru import logger def _as_seconds(t: str) -> int: @@ -153,62 +152,40 @@ def _as_seconds(t: str) -> int: "ca_bundle": [], "disable_ssl_warnings": False, }, - "logging": { - "version": 1, + "loguru": { "formatters": { - "ch-backup": { - "format": "%(asctime)s %(processName)-11s %(process)-5d [%(levelname)-8s] %(name)s: %(message)s", - }, - "boto": { - "format": "%(asctime)s %(processName)-11s %(process)-5d [%(levelname)-8s] %(name)s: %(message)s", - }, + "ch-backup": "{time:YYYY-MM-DD H:m:s,SSS} {process.name:11} {process.id:5} [{level:8}] {extra[logger_name]}: {message}", }, "handlers": { "ch-backup": { - "class": "logging.FileHandler", - "filename": "/var/log/ch-backup/ch-backup.log", - "formatter": "ch-backup", - }, - "boto": { - "class": "logging.FileHandler", - "filename": "/var/log/ch-backup/boto.log", - "formatter": "boto", - }, - "clickhouse-disks": { - "class": "logging.FileHandler", - "filename": "/var/log/ch-backup/clickhouse-disks.log", - "formatter": "ch-backup", - }, - }, - "loggers": { - "ch-backup": { - "handlers": ["ch-backup"], + "sink": "/var/log/ch-backup/ch-backup.log", "level": "DEBUG", + "format": "ch-backup", }, - "botocore": { - "handlers": ["boto"], - "level": "INFO", - }, - "botocore.endpoint": { + "zookeeper": { + "sink": "/var/log/ch-backup/ch-backup.log", "level": "DEBUG", + "format": "ch-backup", }, - "botocore.vendored.requests": { - "level": "DEBUG", + "botocore": { + "sink": "/var/log/ch-backup/boto.log", + "format": "ch-backup", + "filter": { + "botocore": "INFO", + "botocore.endpoint": "DEBUG", + "botocore.vendored.requests": "DEBUG", + "botocore.parsers": "DEBUG", + }, }, - "botocore.parsers": { + "clickhouse-disks": { + "sink": "/var/log/ch-backup/clickhouse-disks.log", "level": "DEBUG", + "format": "ch-backup", }, "urllib3.connectionpool": { - "handlers": ["boto"], - "level": "DEBUG", - }, - "clickhouse-disks": { - "handlers": ["clickhouse-disks"], - "level": "INFO", - }, - "zookeeper": { - "handlers": ["ch-backup"], + "sink": "/var/log/ch-backup/boto.log", "level": "DEBUG", + "format": "ch-backup", }, }, }, @@ -272,14 +249,14 @@ def __getitem__(self, item): try: return self._conf[item] except KeyError: - logging.critical('Config item "%s" was not defined', item) + logger.critical('Config item "{}" was not defined', item) raise def __setitem__(self, item, value): try: self._conf[item] = value except KeyError: - logging.critical('Config item "%s" was not defined', item) + logger.critical('Config item "{}" was not defined', item) raise def get(self, key: str, default: Any = None) -> Any: diff --git a/ch_backup/logging.py b/ch_backup/logging.py index 0f80e21b..6049cd83 100644 --- a/ch_backup/logging.py +++ b/ch_backup/logging.py @@ -2,67 +2,142 @@ Logging module. """ +import inspect import logging -import logging.config -import os +from typing import Any import psutil +from loguru import logger from ch_backup.util import format_size -def configure(config: dict) -> None: +class Filter: """ - Configure logging. + Filter for luguru handler. """ - for handler in config.get("handlers", {}).values(): - filename = handler.get("filename") - if filename: - os.makedirs(os.path.dirname(filename), exist_ok=True) - logging.config.dictConfig(config) + def __init__(self, name): + self._name = name + + def __call__(self, record): + """ + Filter callback to decide for each logged message whether it should be sent to the sink or not. + """ + + return record["extra"].get("logger_name") == self._name + + +def make_filter(name): + """ + Factory for filter creation. + """ + + return Filter(name) + + +class InterceptHandler(logging.Handler): + """ + Helper class for logging interception. + """ + + def emit(self, record: logging.LogRecord) -> None: + """ + Intercept all records from the logging module and redirect them into loguru. + + The handler for loguru will be chosen based on module name. + """ + + # Get corresponding Loguru level if it exists. + level: int or str # type: ignore + try: + level = logger.level(record.levelname).name + except ValueError: + level = record.levelno + + # Find caller from where originated the logged message. + frame, depth = inspect.currentframe(), 0 + while frame and (depth == 0 or frame.f_code.co_filename == logging.__file__): + frame = frame.f_back + depth += 1 + + logger.bind(logger_name=record.name).opt( + depth=depth, exception=record.exc_info + ).log(level, record.getMessage()) + + +def configure(config_loguru: dict) -> None: + """ + Configure logger. + """ + # Configure loguru. + loguru_handlers = [] + + for name, value in config_loguru["handlers"].items(): + handler = { + "sink": value["sink"], + "format": config_loguru["formatters"][value["format"]], + "enqueue": True, + "filter": value["filter"] if "filter" in value else make_filter(name), + } + if "level" in value: + handler["level"] = value["level"] + loguru_handlers.append(handler) + + logger.configure(handlers=loguru_handlers, activation=[("", True)]) + + # Configure logging. + logging.basicConfig(handlers=[InterceptHandler()], level=0) + logging.debug("Checkroot") def critical(msg, *args, **kwargs): """ Log a message with severity 'CRITICAL'. """ - _get_logger().critical(msg, *args, **kwargs) + with_exception = kwargs.get("exc_info", False) + getLogger("ch-backup").opt(exception=with_exception).critical(msg, *args, **kwargs) def error(msg, *args, **kwargs): """ Log a message with severity 'ERROR'. """ - _get_logger().error(msg, *args, **kwargs) + with_exception = kwargs.get("exc_info", False) + getLogger("ch-backup").opt(exception=with_exception).error(msg, *args, **kwargs) def exception(msg, *args, **kwargs): """ Log a message with severity 'ERROR' with exception information. """ - _get_logger().exception(msg, *args, **kwargs) + + with_exception = kwargs.get("exc_info", False) + getLogger("ch-backup").opt(exception=with_exception).debug(msg, *args, **kwargs) def warning(msg, *args, **kwargs): """ Log a message with severity 'WARNING'. """ - _get_logger().warning(msg, *args, **kwargs) + with_exception = kwargs.get("exc_info", False) + getLogger("ch-backup").opt(exception=with_exception).warning(msg, *args, **kwargs) def info(msg, *args, **kwargs): """ Log a message with severity 'INFO'. """ - _get_logger().info(msg, *args, **kwargs) + with_exception = kwargs.get("exc_info", False) + getLogger("ch-backup").opt(exception=with_exception).info(msg, *args, **kwargs) def debug(msg, *args, **kwargs): """ Log a message with severity 'DEBUG'. """ - _get_logger().debug(msg, *args, **kwargs) + with_exception = kwargs.get("exc_info", False) + getLogger("ch-backup").opt(exception=with_exception).debug(msg, *args, **kwargs) def memory_usage(): @@ -84,7 +159,7 @@ def memory_usage(): total_usage = main_proc_usage + worker_procs_usage debug( - "Memory usage: %s (main process: %s, worker processes: %s)", + "Memory usage: {} (main process: {}, worker processes: {})", format_size(total_usage), format_size(main_proc_usage), format_size(worker_procs_usage), @@ -94,5 +169,10 @@ def memory_usage(): warning("Unable to get memory usage", exc_info=True) -def _get_logger() -> logging.Logger: - return logging.getLogger("ch-backup") +# pylint: disable=invalid-name +def getLogger(name: str) -> Any: + """ + Get logger with specific name. + """ + + return logger.bind(logger_name=name) diff --git a/ch_backup/logic/database.py b/ch_backup/logic/database.py index a23bd518..a13669e3 100644 --- a/ch_backup/logic/database.py +++ b/ch_backup/logic/database.py @@ -53,7 +53,7 @@ def restore(context: BackupContext, databases: Dict[str, Database]) -> None: databases_to_restore[name] = db continue - logging.info("Restoring databases: %s", ", ".join(databases_to_restore.keys())) + logging.info("Restoring databases: {}", ", ".join(databases_to_restore.keys())) for db in databases_to_restore.values(): if db.has_embedded_metadata(): db_sql = embedded_schema_db_sql(db) @@ -86,7 +86,7 @@ def _backup_database(context: BackupContext, db: Database) -> None: """ Backup database. """ - logging.debug('Performing database backup for "%s"', db.name) + logging.debug('Performing database backup for "{}"', db.name) if not db.has_embedded_metadata(): context.backup_layout.upload_database_create_statement( diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index c0b93849..6a2a034c 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -88,7 +88,7 @@ def _collect_local_metadata_mtime( mtime = self._get_mtime(table.metadata_path) if mtime is None: logging.warning( - 'Cannot get metadata mtime for table "%s"."%s". Skipping it', + 'Cannot get metadata mtime for table "{}"."{}". Skipping it', table.database, table.name, ) @@ -200,7 +200,7 @@ def restore( ] if missed_tables: logging.critical( - "Required tables %s were not found in backup metadata", + "Required tables {} were not found in backup metadata", ", ".join([f"{t.database}.{t.name}" for t in missed_tables]), ) raise ClickhouseBackupError( @@ -279,7 +279,7 @@ def _load_create_statement_from_disk(table: Table) -> Optional[bytes]: """ if not table.metadata_path: logging.debug( - 'Cannot load a create statement of the table "%s"."%s". Metadata is empty', + 'Cannot load a create statement of the table "{}"."{}". Metadata is empty', table.database, table.name, ) @@ -288,7 +288,7 @@ def _load_create_statement_from_disk(table: Table) -> Optional[bytes]: return Path(table.metadata_path).read_bytes() except OSError as e: logging.debug( - 'Cannot load a create statement of the table "%s"."%s": %s', + 'Cannot load a create statement of the table "{}"."{}": {}', table.database, table.name, str(e), @@ -311,14 +311,14 @@ def _backup_table( Return backup metadata of successfully backuped table, otherwise None. """ logging.debug( - 'Performing table backup for "%s"."%s"', table.database, table.name + 'Performing table backup for "{}"."{}"', table.database, table.name ) table_meta = TableMetadata(table.database, table.name, table.engine, table.uuid) create_statement = self._load_create_statement_from_disk(table) if not create_statement: logging.warning( - 'Skipping table backup for "%s"."%s". Local metadata is empty or absent', + 'Skipping table backup for "{}"."{}". Local metadata is empty or absent', db.name, table.name, ) @@ -333,7 +333,7 @@ def _backup_table( raise logging.warning( - 'Table "%s"."%s" was removed by a user during backup', + 'Table "{}"."{}" was removed by a user during backup', table.database, table.name, ) @@ -343,7 +343,7 @@ def _backup_table( new_mtime = self._get_mtime(table.metadata_path) if new_mtime is None or mtimes[table.name].mtime != new_mtime: logging.warning( - 'Skipping table backup for "%s"."%s". The metadata file was updated or removed during backup', + 'Skipping table backup for "{}"."{}". The metadata file was updated or removed during backup', table.database, table.name, ) @@ -375,13 +375,13 @@ def _backup_frozen_table_data( """ if not is_merge_tree(table.engine): logging.info( - 'Skipping table data backup for non MergeTree table "%s"."%s"', + 'Skipping table data backup for non MergeTree table "{}"."{}"', table.database, table.name, ) return - logging.debug('Uploading table data for "%s"."%s"', table.database, table.name) + logging.debug('Uploading table data for "{}"."{}"', table.database, table.name) uploaded_parts = [] for data_path, disk in table.paths_with_disks: @@ -390,7 +390,7 @@ def _backup_frozen_table_data( ) for fpart in freezed_parts: - logging.debug("Working on %s", fpart) + logging.debug("Working on {}", fpart) if disk.type == "s3": table_meta.add_part(PartMetadata.from_frozen_part(fpart)) @@ -470,7 +470,7 @@ def _preprocess_tables_to_restore( ): continue logging.warning( - 'Table "%s"."%s" will be recreated as its schema mismatches the schema from backup: "%s" != "%s"', + 'Table "{}"."{}" will be recreated as its schema mismatches the schema from backup: "{}" != "{}"', table.database, table.name, existing_table.create_statement, @@ -518,7 +518,7 @@ def _restore_tables( other_tables = [] for table in tables: logging.debug( - "Preparing table %s for restoring", f"{table.database}.{table.name}" + "Preparing table {} for restoring", f"{table.database}.{table.name}" ) self._rewrite_table_schema( context, databases[table.database], table, add_uuid_if_required=True @@ -540,7 +540,7 @@ def _restore_tables( ] logging.debug( - "Deleting replica metadata for replicated tables: %s", + "Deleting replica metadata for replicated tables: {}", ", ".join([f"{t.database}.{t.name}" for t in replicated_tables]), ) context.zk_ctl.delete_replica_metadata( @@ -567,7 +567,7 @@ def _restore_data( for table_meta in tables: try: logging.debug( - 'Running table "%s.%s" data restore', + 'Running table "{}.{}" data restore', table_meta.database, table_meta.name, ) @@ -631,7 +631,7 @@ def _restore_data( ) for part in attach_parts: logging.debug( - 'Attaching "%s.%s" part: %s', + 'Attaching "{}.{}" part: {}', table_meta.database, table.name, part.name, @@ -641,7 +641,7 @@ def _restore_data( context.restore_context.add_part(part, PartState.RESTORED) except Exception as e: logging.warning( - 'Attaching "%s.%s" part %s failed: %s', + 'Attaching "{}.{}" part {} failed: {}', table_meta.database, table.name, part.name, @@ -698,7 +698,7 @@ def _restore_table_objects( table = unprocessed.popleft() try: logging.debug( - "Trying to restore table object for table %s", + "Trying to restore table object for table {}", f"{table.database}.{table.name}", ) self._restore_table_object(context, databases[table.database], table) @@ -723,7 +723,7 @@ def _restore_table_objects( if errors: logging.error( - "Failed to restore tables:\n%s", + "Failed to restore tables:\n{}", "\n".join(f'"{v.database}"."{v.name}": {e!r}' for v, e in errors), ) diff --git a/ch_backup/logic/udf.py b/ch_backup/logic/udf.py index 84237bb2..84dd1670 100644 --- a/ch_backup/logic/udf.py +++ b/ch_backup/logic/udf.py @@ -23,7 +23,7 @@ def backup(self, context: BackupContext) -> None: for udf_name in udf.keys(): context.backup_meta.add_udf(udf_name) - logging.debug("Performing UDF backup for: %s", " ,".join(udf.keys())) + logging.debug("Performing UDF backup for: {}", " ,".join(udf.keys())) for udf_name, udf_statement in udf.items(): context.backup_layout.upload_udf( context.backup_meta.name, udf_name, udf_statement @@ -40,13 +40,13 @@ def restore(self, context: BackupContext) -> None: if not udf_list: return - logging.info("Restoring UDFs: %s", " ,".join(udf_list)) + logging.info("Restoring UDFs: {}", " ,".join(udf_list)) udf_on_clickhouse = context.ch_ctl.get_udf_query() udf_on_clickhouse_list = udf_on_clickhouse.keys() for udf_name in udf_list: - logging.debug("Restoring UDF %s", udf_name) + logging.debug("Restoring UDF {}", udf_name) statement = context.backup_layout.get_udf_create_statement( context.backup_meta, udf_name @@ -62,7 +62,7 @@ def restore(self, context: BackupContext) -> None: if udf_name not in udf_on_clickhouse_list: context.ch_ctl.restore_udf(statement) - logging.debug("UDF %s restored", udf_name) + logging.debug("UDF {} restored", udf_name) logging.info("All UDFs restored") diff --git a/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py b/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py index d189f141..cb6c071f 100644 --- a/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py +++ b/ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py @@ -31,7 +31,7 @@ def submit(self, future_id: str, func: Callable, *args: Any, **kwargs: Any) -> N if future_id in self._futures: raise RuntimeError("Duplicate") future = self._pool.submit(func, *args, **kwargs) - future.add_done_callback(lambda _: logging.debug('Future "%s" completed', future_id)) # type: ignore[misc] + future.add_done_callback(lambda _: logging.debug("Future {} completed", future_id)) # type: ignore[misc] self._futures[future_id] = future def wait_all(self, keep_going: bool = False) -> None: @@ -49,13 +49,13 @@ def wait_all(self, keep_going: bool = False) -> None: except Exception: if keep_going: logging.warning( - 'Future "%s" generated an exception, skipping due to keep_going flag', + 'Future "{}" generated an exception, skipping due to keep_going flag', future_id, exc_info=True, ) continue logging.error( - 'Future "%s" generated an exception:', future_id, exc_info=True + 'Future "{}" generated an exception:', future_id, exc_info=True ) raise self._futures = {} diff --git a/ch_backup/storage/pipeline.py b/ch_backup/storage/pipeline.py index 6fce62d8..87764d9b 100644 --- a/ch_backup/storage/pipeline.py +++ b/ch_backup/storage/pipeline.py @@ -88,7 +88,7 @@ def submit(self, future_id: str, func: Callable, *args: Any, **kwargs: Any) -> N Schedule job for execution """ future = self._pool.submit(func, *args, **kwargs) - future.add_done_callback(logging.debug('Future "%s" completed', future_id)) + future.add_done_callback(logging.debug('Future "{}" completed', future_id)) self._futures[future_id] = future def wait_all(self) -> None: @@ -103,7 +103,7 @@ def wait_all(self) -> None: future.result() except Exception: logging.error( - 'Future "%s" generated an exception:', future_id, exc_info=True + 'Future "{}" generated an exception:', future_id, exc_info=True ) raise self._futures = {} diff --git a/ch_backup/util.py b/ch_backup/util.py index cb0b5334..f7ba3249 100644 --- a/ch_backup/util.py +++ b/ch_backup/util.py @@ -206,7 +206,7 @@ def retry( def _log_retry(retry_state): logging.debug( - "Retrying %s.%s in %.2fs, attempt: %s, reason: %r", + "Retrying {}.{} in {}, attempt: {}, reason: {}", retry_state.fn.__module__, retry_state.fn.__qualname__, retry_state.next_action.sleep, diff --git a/ch_backup/zookeeper/zookeeper.py b/ch_backup/zookeeper/zookeeper.py index 41ea2e09..ebed089c 100644 --- a/ch_backup/zookeeper/zookeeper.py +++ b/ch_backup/zookeeper/zookeeper.py @@ -1,8 +1,7 @@ """ ZooKeeper-control classes module """ - -import logging +import logging as py_logging import os from typing import Dict, Iterable, Tuple @@ -10,7 +9,7 @@ from kazoo.exceptions import KazooException, NoNodeError from kazoo.handlers.threading import KazooTimeoutError -from ch_backup.logging import debug +from ch_backup import logging from ..clickhouse.models import Table from ..util import retry @@ -32,7 +31,7 @@ def __init__(self, config: dict): certfile=config.get("cert"), keyfile=config.get("key"), ca=config.get("ca"), - logger=logging.getLogger("zookeeper"), + logger=py_logging.getLogger("zookeeper"), randomize_hosts=config.get("randomize_hosts", True), ) self._zk_user = config.get("user") @@ -102,7 +101,7 @@ def delete_replica_metadata( "replicas", replica, ) # remove leading '/' - debug(f'Deleting zk node: "{path}"') + logging.debug(f'Deleting zk node: "{path}"') try: client.delete(path, recursive=True) except NoNodeError: @@ -124,7 +123,7 @@ def delete_replicated_database_metadata( path = os.path.join( self._zk_root_path, zk_path[1:].format(**macros) ) # remove leading '/' - debug(f'Deleting zk node: "{path}"') + logging.debug(f'Deleting zk node: "{path}"') try: client.delete(path, recursive=True) except NoNodeError: diff --git a/requirements.txt b/requirements.txt index c2ac3a98..ba26cb5c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,3 +14,4 @@ xmltodict pypeln==0.4.9 dataclasses>=0.7,<0.8; python_version <"3.7" # required for pypeln==0.4.9 typing_extensions>=3.7.4,<4.0; python_version <"3.8" # required for pypeln==0.4.9 +loguru