Skip to content

Commit

Permalink
Loguru for logging. (#71)
Browse files Browse the repository at this point in the history
* init

* More

* fix kazoo

* Fixes

* ch_backup/logging.py

* Changes

* Review changes

* Fixes
  • Loading branch information
MikhailBurdukov authored Sep 27, 2023
1 parent 9fe14d8 commit 5b5e6c7
Show file tree
Hide file tree
Showing 17 changed files with 202 additions and 147 deletions.
6 changes: 3 additions & 3 deletions ch_backup/backup/deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def deduplicate_part(
"""
part_name = fpart.name

logging.debug('Looking for deduplication of part "%s"', part_name)
logging.debug('Looking for deduplication of part "{}"', part_name)

existing_part = dedup_info.get(part_name)
if not existing_part:
Expand All @@ -253,14 +253,14 @@ def deduplicate_part(
if not existing_part.verified:
if not layout.check_data_part(existing_part.backup_path, part):
logging.debug(
'Part "%s" found in "%s", but it\'s invalid, skipping',
'Part "{}" found in "{}", but it\'s invalid, skipping',
part_name,
existing_part.backup_path,
)
return None

logging.debug(
'Part "%s" found in "%s", reusing', part_name, existing_part.backup_path
'Part "{}" found in "{}", reusing', part_name, existing_part.backup_path
)

return part
Expand Down
32 changes: 16 additions & 16 deletions ch_backup/backup/layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ def upload_backup_metadata(self, backup: BackupMetadata) -> None:
remote_path = self._backup_metadata_path(backup.name)
remote_light_path = self._backup_light_metadata_path(backup.name)
try:
logging.debug("Saving backup metadata in %s", remote_path)
logging.debug("Saving backup metadata in {}", remote_path)
self._storage_loader.upload_data(
backup.dump_json(light=False), remote_path=remote_path
)
logging.debug("Saving backup light metadata in %s", remote_light_path)
logging.debug("Saving backup light metadata in {}", remote_light_path)
self._storage_loader.upload_data(
backup.dump_json(light=True), remote_path=remote_light_path
)
Expand All @@ -66,7 +66,7 @@ def upload_database_create_statement(self, backup_name: str, db: Database) -> No
remote_path = _db_metadata_path(self.get_backup_path(backup_name), db.name)
try:
logging.debug(
'Uploading metadata (create statement) for database "%s"', db.name
'Uploading metadata (create statement) for database "{}"', db.name
)
self._storage_loader.upload_file(
local_path, remote_path=remote_path, encryption=True
Expand All @@ -88,7 +88,7 @@ def upload_table_create_statement(
)
try:
logging.debug(
'Uploading metadata (create statement) for table "%s"."%s"',
'Uploading metadata (create statement) for table "{}"."{}"',
db.name,
table.name,
)
Expand All @@ -108,7 +108,7 @@ def upload_access_control_file(self, backup_name: str, file_name: str) -> None:
self.get_backup_path(backup_name), file_name
)
try:
logging.debug('Uploading access control data "%s"', local_path)
logging.debug('Uploading access control data "{}"', local_path)
self._storage_loader.upload_file(
local_path=local_path, remote_path=remote_path, encryption=True
)
Expand All @@ -127,7 +127,7 @@ def upload_access_control_files(
self.get_backup_path(backup_name), ACCESS_CONTROL_FNAME
)
try:
logging.debug('Uploading access control data "%s"', local_path)
logging.debug('Uploading access control data "{}"', local_path)
self._storage_loader.upload_files_tarball(
self._access_control_path, file_names, remote_path, encryption=True
)
Expand All @@ -154,7 +154,7 @@ def upload_data_part(self, backup_name: str, fpart: FrozenPart) -> None:
Upload part data.
"""
logging.debug(
'Uploading data part %s of "%s"."%s"',
'Uploading data part {} of "{}"."{}"',
fpart.name,
fpart.database,
fpart.table,
Expand Down Expand Up @@ -255,7 +255,7 @@ def get_backups(self, use_light_meta: bool = False) -> List[BackupMetadata]:
Return list of existing backups sorted by start_time in descent order.
"""
logging.debug(
"Collecting %s of existing backups",
"Collecting {} of existing backups",
"light metadata" if use_light_meta else "metadata",
)

Expand Down Expand Up @@ -322,7 +322,7 @@ def download_access_control_file(self, backup_name: str, file_name: str) -> None
)
local_path = os.path.join(self._access_control_path, file_name)
logging.debug(
'Downloading access control metadata "%s" to "%s', remote_path, local_path
'Downloading access control metadata "{}" to "{}', remote_path, local_path
)
try:
self._storage_loader.download_file(remote_path, local_path, encryption=True)
Expand All @@ -339,7 +339,7 @@ def download_access_control(self, backup_name: str) -> None:
)
local_path = self._access_control_path
logging.debug(
'Downloading access control metadata "%s" to "%s', remote_path, local_path
'Downloading access control metadata "{}" to "{}', remote_path, local_path
)
try:
self._storage_loader.download_files(
Expand All @@ -356,7 +356,7 @@ def download_data_part(
Download part data to the specified directory.
"""
logging.debug(
'Downloading data part %s of "%s"."%s"',
'Downloading data part {} of "{}"."{}"',
part.name,
part.database,
part.table,
Expand All @@ -370,7 +370,7 @@ def download_data_part(

if part.tarball:
remote_path = os.path.join(remote_dir_path, f"{part.name}.tar")
logging.debug("Downloading part tarball file: %s", remote_path)
logging.debug("Downloading part tarball file: {}", remote_path)
try:
self._storage_loader.download_files(
remote_path=remote_path,
Expand All @@ -386,7 +386,7 @@ def download_data_part(
local_path = os.path.join(fs_part_path, filename)
remote_path = os.path.join(remote_dir_path, filename)
try:
logging.debug("Downloading part file: %s", remote_path)
logging.debug("Downloading part file: {}", remote_path)
self._storage_loader.download_file(
remote_path=remote_path,
local_path=local_path,
Expand Down Expand Up @@ -422,7 +422,7 @@ def check_data_part(self, backup_path: str, part: PartMetadata) -> bool:
notfound_files = set(part.files) - set(remote_files)
if notfound_files:
logging.warning(
"Some part files were not found in %s: %s",
"Some part files were not found in {}: {}",
remote_dir_path,
", ".join(notfound_files),
)
Expand Down Expand Up @@ -469,7 +469,7 @@ def delete_backup(self, backup_name: str) -> None:
"""
backup_path = self.get_backup_path(backup_name)

logging.debug("Deleting data in %s", backup_path)
logging.debug("Deleting data in {}", backup_path)

deleting_files = self._storage_loader.list_dir(
backup_path, recursive=True, absolute=True
Expand All @@ -490,7 +490,7 @@ def delete_data_parts(
part_path = _part_path(
part.link or backup_meta.path, part.database, part.table, part.name
)
logging.debug("Deleting data part %s", part_path)
logging.debug("Deleting data part {}", part_path)
if part.tarball:
deleting_files.append(os.path.join(part_path, f"{part.name}.tar"))
else:
Expand Down
12 changes: 6 additions & 6 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def backup(
self._context.backup_meta
)
logging.debug(
'Starting backup "%s" for databases: %s',
'Starting backup "{}" for databases: {}',
self._context.backup_meta.name,
", ".join(map(lambda db: db.name, databases)),
)
Expand Down Expand Up @@ -273,7 +273,7 @@ def restore(
]
if missed_databases:
logging.critical(
"Required databases %s were not found in backup metadata: %s",
"Required databases {} were not found in backup metadata: {}",
", ".join(missed_databases),
self._context.backup_meta.path,
)
Expand Down Expand Up @@ -360,13 +360,13 @@ def purge(self) -> Tuple[Sequence[str], Optional[str]]:
with self._context.locker():
for backup in self._context.backup_layout.get_backups(use_light_meta=False):
if backup.name not in backup_names:
logging.info("Deleting backup without metadata: %s", backup.name)
logging.info("Deleting backup without metadata: {}", backup.name)
self._context.backup_layout.delete_backup(backup.name)
continue

if retain_count > 0:
logging.info(
"Preserving backup per retain count policy: %s, state %s",
"Preserving backup per retain count policy: {}, state {}",
backup.name,
backup.state,
)
Expand All @@ -377,7 +377,7 @@ def purge(self) -> Tuple[Sequence[str], Optional[str]]:

if retain_time_limit and backup.start_time >= retain_time_limit:
logging.info(
"Preserving backup per retain time policy: %s, state %s",
"Preserving backup per retain time policy: {}, state {}",
backup.name,
backup.state,
)
Expand Down Expand Up @@ -413,7 +413,7 @@ def _delete(
self, backup: BackupMetadata, dedup_references: DedupReferences
) -> Tuple[Optional[str], Optional[str]]:
logging.info(
"Deleting backup %s, state: %s",
"Deleting backup {}, state: {}",
backup.name,
backup.state,
)
Expand Down
8 changes: 4 additions & 4 deletions ch_backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def cli(
if zk_hosts is not None:
cfg["zookeeper"]["hosts"] = zk_hosts

logging.configure(cfg["logging"])
logging.configure(cfg["loguru"])
setup_environment(cfg["main"])

if not drop_privileges(cfg["main"]):
Expand All @@ -149,7 +149,7 @@ def decorator(f):
def wrapper(ctx, *args, **kwargs):
try:
logging.info(
"Executing command '%s', params: %s, args: %s, version: %s",
"Executing command '{}', params: {}, args: {}, version: {}",
ctx.command.name,
{
**ctx.parent.params,
Expand All @@ -159,10 +159,10 @@ def wrapper(ctx, *args, **kwargs):
get_version(),
)
result = ctx.invoke(f, ctx, ctx.obj["backup"], *args, **kwargs)
logging.info("Command '%s' completed", ctx.command.name)
logging.info("Command '{}' completed", ctx.command.name)
return result
except (Exception, TerminatingSignal):
logging.exception("Command '%s' failed", ctx.command.name)
logging.exception("Command '{}' failed", ctx.command.name)
raise

return cli.command(*args, **kwargs)(wrapper)
Expand Down
2 changes: 1 addition & 1 deletion ch_backup/clickhouse/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def query(
Execute query.
"""
try:
logging.debug("Executing query: %s", query)
logging.debug("Executing query: {}", query)

if timeout is None:
timeout = self.timeout
Expand Down
6 changes: 3 additions & 3 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,14 +400,14 @@ def remove_freezed_data(self) -> None:
for disk in self._disks.values():
if disk.type == "local":
shadow_path = os.path.join(disk.path, "shadow")
logging.debug("Removing shadow data: %s", shadow_path)
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)

def remove_freezed_part(self, part: FrozenPart) -> None:
"""
Remove the freezed part.
"""
logging.debug("Removing freezed part: %s", part.path)
logging.debug("Removing freezed part: {}", part.path)
self._remove_shadow_data(part.path)

def get_databases(
Expand Down Expand Up @@ -628,7 +628,7 @@ def list_frozen_parts(
path = os.path.join(disk.path, "shadow", backup_name, table_relative_path)

if not os.path.exists(path):
logging.debug("Shadow path %s is empty", path)
logging.debug("Shadow path {} is empty", path)
return []

freezed_parts: List[FrozenPart] = []
Expand Down
22 changes: 10 additions & 12 deletions ch_backup/clickhouse/disks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
Clickhouse-disks controls temporary cloud storage disks management.
"""

import logging

import os
from subprocess import PIPE, Popen
from typing import Dict, List, Optional
from urllib.parse import urlparse

import xmltodict

import ch_backup.logging as ch_logging
from ch_backup import logging
from ch_backup.backup.layout import BackupLayout
from ch_backup.backup.metadata import BackupMetadata, PartMetadata
from ch_backup.clickhouse.config import ClickhouseConfig
Expand Down Expand Up @@ -78,13 +78,13 @@ def __enter__(self):

def __exit__(self, exc_type, *args, **kwargs):
if exc_type is not None:
ch_logging.warning(
logging.warning(
f'Omitting tmp cloud storage disk cleanup due to exception: "{exc_type}"'
)
return False

for disk in self._created_disks.values():
ch_logging.debug(f"Removing tmp disk {disk.name}")
logging.debug(f"Removing tmp disk {disk.name}")
try:
os.remove(_get_config_path(self._config_dir, disk.name))
return True
Expand All @@ -100,7 +100,7 @@ def _create_temporary_disk(
source_endpoint: str,
) -> None:
tmp_disk_name = _get_tmp_disk_name(disk.name)
ch_logging.debug(f"Creating tmp disk {tmp_disk_name}")
logging.debug(f"Creating tmp disk {tmp_disk_name}")
disk_config = self._ch_config.config["storage_configuration"]["disks"][
disk.name
]
Expand Down Expand Up @@ -129,9 +129,7 @@ def _create_temporary_disk(

self._ch_ctl.reload_config()
source_disk = self._ch_ctl.get_disk(tmp_disk_name)
ch_logging.debug(
f'Restoring Cloud Storage "shadow" data of disk "{disk.name}"'
)
logging.debug(f'Restoring Cloud Storage "shadow" data of disk "{disk.name}"')
self._backup_layout.download_cloud_storage_metadata(
backup_meta, source_disk, disk.name
)
Expand Down Expand Up @@ -180,23 +178,23 @@ def _copy_dir(from_disk: str, from_path: str, to_disk: str, to_path: str) -> Non
common_args=[],
command_args=["--diskFrom", from_disk, "--diskTo", to_disk, from_path, to_path],
)
ch_logging.warning(f"clickhouse-disks copy result: {os.linesep.join(result)}")
logging.warning(f"clickhouse-disks copy result: {os.linesep.join(result)}")


def _exec(command: str, common_args: List[str], command_args: List[str]) -> List[str]:
logger = logging.getLogger("clickhouse-disks")
ch_disks_logger = logging.getLogger("clickhouse-disks")
command_args = [
"/usr/bin/clickhouse-disks",
*common_args,
command,
*command_args,
]
ch_logging.debug(f'Executing "{" ".join(command_args)}"')
logging.debug(f'Executing "{" ".join(command_args)}"')

with Popen(command_args, stdout=PIPE, stderr=PIPE, shell=False) as proc:
while proc.poll() is None:
for line in proc.stderr.readlines(): # type: ignore
logger.info(line.decode("utf-8").strip())
ch_disks_logger.info(line.decode("utf-8").strip())
if proc.returncode != 0:
raise ClickHouseDisksException(
f"clickhouse-disks call failed with exitcode: {proc.returncode}"
Expand Down
Loading

0 comments on commit 5b5e6c7

Please sign in to comment.