Skip to content

Commit

Permalink
Upload backup metadata during backup
Browse files Browse the repository at this point in the history
  • Loading branch information
aalexfvk committed Oct 25, 2023
1 parent 934a4b4 commit 8d1f124
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 47 deletions.
7 changes: 5 additions & 2 deletions ch_backup/backup/layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import os
from pathlib import Path
from typing import List, Optional, Sequence
from typing import Callable, List, Optional, Sequence
from urllib.parse import quote

from ch_backup import logging
Expand Down Expand Up @@ -151,7 +151,9 @@ def upload_udf(self, backup_name: str, file_name: str, metadata: str) -> None:
msg = f'Failed to upload udf metadata "{remote_path}"'
raise StorageError(msg) from e

def upload_data_part(self, backup_name: str, fpart: FrozenPart) -> None:
def upload_data_part(
self, backup_name: str, fpart: FrozenPart, callback: Callable
) -> None:
"""
Upload part data.
"""
Expand All @@ -174,6 +176,7 @@ def upload_data_part(self, backup_name: str, fpart: FrozenPart) -> None:
is_async=True,
encryption=True,
delete=True,
callback=callback,
)
except Exception as e:
msg = f"Failed to create async upload of {remote_path}"
Expand Down
1 change: 1 addition & 0 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def _as_seconds(t: str) -> int:
"restore_context_path": "/tmp/ch_backup_restore_state.json", # nosec
"validate_part_after_upload": False,
"restore_fail_on_attach_error": False,
"update_metadata_interval": _as_seconds("30 min"),
},
"storage": {
"type": "s3",
Expand Down
50 changes: 25 additions & 25 deletions ch_backup/logic/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
from collections import deque
from dataclasses import dataclass
from functools import partial
from itertools import chain
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
Expand Down Expand Up @@ -33,6 +34,7 @@
)
from ch_backup.exceptions import ClickhouseBackupError
from ch_backup.logic.backup_manager import BackupManager
from ch_backup.logic.upload_part_observer import UploadPartObserver
from ch_backup.util import compare_schema, get_table_zookeeper_paths


Expand Down Expand Up @@ -123,7 +125,7 @@ def _backup(
if table.name not in mtimes:
continue

table_meta = self._backup_table(
self._backup_table(
context,
db,
table,
Expand All @@ -132,8 +134,6 @@ def _backup(
dedup_info.table(table.name),
mtimes,
)
if table_meta is not None:
context.backup_meta.add_table(table_meta)

context.backup_layout.upload_backup_metadata(context.backup_meta)

Expand Down Expand Up @@ -304,7 +304,7 @@ def _backup_table(
schema_only: bool,
dedup_info: TableDedupInfo,
mtimes: Dict[str, TableMetadataMtime],
) -> Optional[TableMetadata]:
) -> None:
"""
Make backup of metadata and data of single table.
Expand All @@ -313,16 +313,14 @@ def _backup_table(
logging.debug(
'Performing table backup for "{}"."{}"', table.database, table.name
)
table_meta = TableMetadata(table.database, table.name, table.engine, table.uuid)

create_statement = self._load_create_statement_from_disk(table)
if not create_statement:
logging.warning(
'Skipping table backup for "{}"."{}". Local metadata is empty or absent',
db.name,
table.name,
)
return None
return

# Freeze only MergeTree tables
if not schema_only and is_merge_tree(table.engine):
Expand All @@ -337,7 +335,7 @@ def _backup_table(
table.database,
table.name,
)
return None
return

# Check if table metadata was updated
new_mtime = self._get_mtime(table.metadata_path)
Expand All @@ -348,25 +346,24 @@ def _backup_table(
table.name,
)
context.ch_ctl.remove_freezed_data()
return None
return

# Add table metadata to backup metadata
context.backup_meta.add_table(
TableMetadata(table.database, table.name, table.engine, table.uuid)
)
# Backup table metadata
context.backup_layout.upload_table_create_statement(
context.backup_meta.name, db, table, create_statement
)
# Backup table data
if not schema_only:
self._backup_frozen_table_data(
context, table, table_meta, backup_name, dedup_info
)

return table_meta
self._backup_frozen_table_data(context, table, backup_name, dedup_info)

def _backup_frozen_table_data(
self,
context: BackupContext,
table: Table,
table_meta: TableMetadata,
backup_name: str,
dedup_info: TableDedupInfo,
) -> None:
Expand All @@ -383,35 +380,38 @@ def _backup_frozen_table_data(

logging.debug('Uploading table data for "{}"."{}"', table.database, table.name)

uploaded_parts = []
upload_observer = UploadPartObserver(context)

for data_path, disk in table.paths_with_disks:
freezed_parts = context.ch_ctl.list_frozen_parts(
table, disk, data_path, backup_name
)

for fpart in freezed_parts:
logging.debug("Working on {}", fpart)
part = PartMetadata.from_frozen_part(fpart)

if disk.type == "s3":
table_meta.add_part(PartMetadata.from_frozen_part(fpart))
context.backup_meta.add_part(part)
continue

# trying to find part in storage
part = deduplicate_part(context.backup_layout, fpart, dedup_info)
if part:
deduplicated_part = deduplicate_part(
context.backup_layout, fpart, dedup_info
)
if deduplicated_part:
context.ch_ctl.remove_freezed_part(fpart)
context.backup_meta.add_part(deduplicated_part)
else:
context.backup_layout.upload_data_part(
context.backup_meta.name, fpart
context.backup_meta.name,
fpart,
partial(upload_observer, part),
)
part = PartMetadata.from_frozen_part(fpart)
uploaded_parts.append(part)

table_meta.add_part(part) # type: ignore

context.backup_layout.wait()

self._validate_uploaded_parts(context, uploaded_parts)
self._validate_uploaded_parts(context, upload_observer.uploaded_parts)

context.ch_ctl.remove_freezed_data()

Expand Down
44 changes: 44 additions & 0 deletions ch_backup/logic/upload_part_observer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Uploading part observer."""
import time
from typing import List, Optional

from ch_backup.backup.metadata.part_metadata import PartMetadata
from ch_backup.backup_context import BackupContext


class UploadPartObserver:
"""
Observe uploading parts.
Update backup metadata with specified interval after completion of
uploading every part to object storage.
"""

def __init__(self, context: BackupContext) -> None:
self._context = context
self._last_time = time.time()
self._uploaded_parts: List[PartMetadata] = []
self._interval = self._context.config["update_metadata_interval"]

def __call__(
self, part: PartMetadata, exception: Optional[Exception] = None
) -> None:
if exception:
return

self._uploaded_parts.append(part)
self._context.backup_meta.add_part(part)

now = time.time()
if now - self._last_time >= self._interval:
self._context.backup_layout.upload_backup_metadata(
self._context.backup_meta
)
self._last_time = now

@property
def uploaded_parts(self) -> List[PartMetadata]:
"""
Return uploaded parts metadata.
"""
return self._uploaded_parts
55 changes: 41 additions & 14 deletions ch_backup/storage/async_pipeline/base_pipeline/exec_pool.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
"""
Class for executing callables on specified pool.
"""
from concurrent.futures import ALL_COMPLETED, Executor, Future, wait
from typing import Any, Callable, Dict
from concurrent.futures import Executor, Future, as_completed
from dataclasses import dataclass
from typing import Any, Callable, Dict, Optional

from ch_backup import logging


@dataclass
class Job:
"""
Job submitted to ExecPool.
Callback is executed after job completion.
"""

id_: str
callback: Optional[Callable]


class ExecPool:
"""
Submit tasks on provided executor.
Expand All @@ -15,7 +28,7 @@ class ExecPool:
"""

def __init__(self, executor: Executor) -> None:
self._futures: Dict[str, Future] = {}
self._future_to_job: Dict[Future, Job] = {}
self._pool = executor

def shutdown(self, graceful: bool = True) -> None:
Expand All @@ -24,15 +37,22 @@ def shutdown(self, graceful: bool = True) -> None:
"""
self._pool.shutdown(wait=graceful)

def submit(self, future_id: str, func: Callable, *args: Any, **kwargs: Any) -> None:
def submit(
self,
job_id: str,
func: Callable,
callback: Optional[Callable],
*args: Any,
**kwargs: Any
) -> None:
"""
Schedule job for execution
"""
if future_id in self._futures:
if job_id in [job.id_ for job in self._future_to_job.values()]:
raise RuntimeError("Duplicate")

future = self._pool.submit(func, *args, **kwargs)
future.add_done_callback(lambda _: logging.debug("Future {} completed", future_id)) # type: ignore[misc]
self._futures[future_id] = future
self._future_to_job[future] = Job(job_id, callback)

def wait_all(self, keep_going: bool = False) -> None:
"""
Expand All @@ -41,24 +61,31 @@ def wait_all(self, keep_going: bool = False) -> None:
Args:
keep_going - skip exceptions raised by futures instead of propagating it.
"""
wait(self._futures.values(), return_when=ALL_COMPLETED)
for future in as_completed(self._future_to_job):
job = self._future_to_job[future]
logging.debug("Future {} completed", job.id_)

for future_id, future in self._futures.items():
try:
future.result()
except Exception:
except Exception as ex:
if job.callback:
job.callback(ex)
if keep_going:
logging.warning(
'Future "{}" generated an exception, skipping due to keep_going flag',
future_id,
'Job "{}" generated an exception, skipping due to keep_going flag',
job.id_,
exc_info=True,
)
continue
logging.error(
'Future "{}" generated an exception:', future_id, exc_info=True
'Job "{}" generated an exception:', job.id_, exc_info=True
)
raise
self._futures = {}

if job.callback:
job.callback()

self._future_to_job = {}

def __del__(self) -> None:
"""
Expand Down
26 changes: 21 additions & 5 deletions ch_backup/storage/async_pipeline/pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def upload_files_tarball(
is_async: bool,
encryption: bool,
delete: bool,
callback: Optional[Callable] = None,
) -> None:
"""
Archive to tarball and upload files from local filesystem.
Expand All @@ -101,7 +102,7 @@ def upload_files_tarball(
encryption,
delete_after=delete,
)
self._exec_pipeline(job_id, pipeline, is_async)
self._exec_pipeline(job_id, pipeline, is_async, callback)

def download_data(
self, remote_path: str, is_async: bool, encryption: bool
Expand Down Expand Up @@ -169,15 +170,30 @@ def wait(self, keep_going: bool = False) -> None:
if self._exec_pool:
self._exec_pool.wait_all(keep_going)

def _exec_pipeline(self, job_id: str, pipeline: Callable, is_async: bool) -> Any:
def _exec_pipeline(
self,
job_id: str,
pipeline: Callable,
is_async: bool,
callback: Optional[Callable] = None,
) -> Any:
"""
Run pipeline inplace or schedule for exec in process pool
"""

if is_async and self._exec_pool:
return self._exec_pool.submit(job_id, pipeline)

return pipeline()
return self._exec_pool.submit(job_id, pipeline, callback)

try:
result = pipeline()
except Exception as ex:
if callback:
callback(ex)
raise

if callback:
callback()
return result

@staticmethod
def _make_job_id(job_name: str, *args: Any) -> str:
Expand Down
Loading

0 comments on commit 8d1f124

Please sign in to comment.