Skip to content

Commit

Permalink
Move freeze logic to the method
Browse files Browse the repository at this point in the history
  • Loading branch information
kirillgarbar committed Jul 9, 2024
1 parent 2f12cb7 commit 87f9d30
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 156 deletions.
7 changes: 4 additions & 3 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ def __init__(self, config: Config) -> None:
self._config = config
self._access_backup_manager = AccessBackup()
self._database_backup_manager = DatabaseBackup()
self._table_backup_manager = TableBackup(
self._config["multiprocessing"]["freeze_threads"]
)
self._table_backup_manager = TableBackup()
self._udf_backup_manager = UDFBackup()
self._nc_backup_manager = NamedCollectionsBackup()

Expand Down Expand Up @@ -194,6 +192,9 @@ def backup(
databases,
db_tables,
schema_only=sources.schema_only,
freeze_threads=self._config["multiprocessing"][
"freeze_threads"
],
)

self._context.backup_meta.state = BackupState.CREATED
Expand Down
6 changes: 6 additions & 0 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,11 @@ def remove_freezed_data(
"""
Remove all freezed partitions from all local disks.
"""
if not (backup_name is None) == (table is None):
raise RuntimeError(
"Both backup_name and table should be None or not None at the same time"
)

if backup_name and table:
for table_data_path, disk in table.paths_with_disks:
if disk.type == "local":
Expand Down Expand Up @@ -793,6 +798,7 @@ def create_shadow_increment(self) -> None:
"""
Create shadow/increment.txt to fix race condition with parallel freeze.
Must be used before freezing more than one table at once.
https://github.com/ClickHouse/ClickHouse/blob/597a72fd9afd88984abc10b284624c6b4d08368b/src/Common/Increment.h#L20
"""
default_shadow_path = Path(self._root_data_path) / "shadow"
increment_path = default_shadow_path / "increment.txt"
Expand Down
265 changes: 116 additions & 149 deletions ch_backup/logic/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from functools import partial
from itertools import chain
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
from typing import Dict, Iterable, List, Optional, Sequence, Tuple

from ch_backup import logging
from ch_backup.backup.deduplication import deduplicate_parts
Expand All @@ -35,122 +35,6 @@
from ch_backup.util import compare_schema, get_table_zookeeper_paths


class TableFreezer:
"""
Class to freeze tables in parallel.
"""

def __init__(self, threads: int):
self._futures: List[Future] = []
self._thread_pool: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=threads)

def __enter__(self):
return self

def __exit__(self, *exc: Any) -> bool: # type: ignore
self._thread_pool.shutdown(wait=False)
self._futures.clear()
return False

def wait(self) -> List[Tuple[Table, bytes]]:
"""
Wait for all submitted tables to freeze and return create statements.
"""
result = []
for future in as_completed(self._futures):
backup_freezed_table = future.result()
if backup_freezed_table is not None:
result.append(backup_freezed_table)
self._futures.clear()
return result

def freeze_table(
self,
context: BackupContext,
db: Database,
table: Table,
backup_name: str,
schema_only: bool,
) -> None:
"""
Submit table to the freeze pool.
"""
# Create shadow/increment.txt if not exists manually to avoid
# race condition with parallel freeze
context.ch_ctl.create_shadow_increment()
future = self._thread_pool.submit(
self._freeze_table,
context,
db,
table,
backup_name,
schema_only,
)
self._futures.append(future)

def _freeze_table(
self,
context: BackupContext,
db: Database,
table: Table,
backup_name: str,
schema_only: bool,
) -> Optional[Tuple[Table, bytes]]:
"""
Freeze table and return it's create statement
"""
logging.debug('Trying to freeze "{}"."{}"', table.database, table.name)

create_statement = self._load_create_statement_from_disk(table)
if not create_statement:
logging.warning(
'Skipping table backup for "{}"."{}". Local metadata is empty or absent',
db.name,
table.name,
)
return None

# Freeze only MergeTree tables
if not schema_only and is_merge_tree(table.engine):
try:
context.ch_ctl.freeze_table(backup_name, table)
except ClickhouseError:
if context.ch_ctl.does_table_exist(table.database, table.name):
raise

logging.warning(
'Table "{}"."{}" was removed by a user during backup',
table.database,
table.name,
)
return None

return (table, create_statement)

@staticmethod
def _load_create_statement_from_disk(table: Table) -> Optional[bytes]:
"""
Load a create statement of the table from a metadata file on the disk.
"""
if not table.metadata_path:
logging.debug(
'Cannot load a create statement of the table "{}"."{}". Metadata is empty',
table.database,
table.name,
)
return None
try:
return Path(table.metadata_path).read_bytes()
except OSError as e:
logging.debug(
'Cannot load a create statement of the table "{}"."{}": {}',
table.database,
table.name,
str(e),
)
return None


@dataclass
class TableMetadataMtime:
"""
Expand All @@ -166,32 +50,29 @@ class TableBackup(BackupManager):
Table backup class
"""

def __init__(self, freeze_threads: int = 1):
self._freeze_threads = freeze_threads

def backup(
self,
context: BackupContext,
databases: Sequence[Database],
db_tables: Dict[str, list],
schema_only: bool,
freeze_threads: int,
) -> None:
"""
Backup tables metadata, MergeTree data and Cloud storage metadata.
"""

backup_name = context.backup_meta.get_sanitized_name()

with TableFreezer(self._freeze_threads) as table_freezer:
for db in databases:
self._backup(
table_freezer,
context,
db,
db_tables[db.name],
backup_name,
schema_only,
)
for db in databases:
self._backup(
context,
db,
db_tables[db.name],
backup_name,
schema_only,
freeze_threads,
)

self._backup_cloud_storage_metadata(context)

Expand Down Expand Up @@ -222,12 +103,12 @@ def _collect_local_metadata_mtime(

def _backup(
self,
table_freezer: TableFreezer,
context: BackupContext,
db: Database,
tables: Sequence[str],
backup_name: str,
schema_only: bool,
freeze_threads: int,
) -> None:
"""
Backup single database tables.
Expand All @@ -238,44 +119,130 @@ def _backup(
# To ensure consistency between metadata and data backups.
# See https://en.wikipedia.org/wiki/Optimistic_concurrency_control
mtimes = self._collect_local_metadata_mtime(context, db, tables)
tables_ = list(
filter(
lambda table: table.name in mtimes,
context.ch_ctl.get_tables(db.name, tables),
)
)

for table in context.ch_ctl.get_tables(db.name, tables):
if table.name not in mtimes:
continue
freezed_tables = self._freeze_tables(
context, db, tables_, backup_name, schema_only, freeze_threads
)

logging.debug(
'Adding "{}"."{}" to the freeze queue',
table.database,
table.name,
)
logging.debug('All tables from "{}" are frozen', db.name)

table_freezer.freeze_table(
for table, create_statement in freezed_tables:
self._backup_freezed_table(
context,
db,
table,
backup_name,
schema_only,
mtimes,
create_statement,
)

# Wait until all tables are freezed
freezed_tables = table_freezer.wait()
context.ch_ctl.remove_freezed_data()

logging.debug('All tables from "{}" are frozen', db.name)
context.backup_layout.upload_backup_metadata(context.backup_meta)

for table, create_statement in freezed_tables:
self._backup_freezed_table(
@staticmethod
def _freeze_tables(
context: BackupContext,
db: Database,
tables: Sequence[Table],
backup_name: str,
schema_only: bool,
threads: int,
) -> List[Tuple[Table, bytes]]:
"""
Freeze tables in parallel.
"""
# Create shadow/increment.txt if not exists manually to avoid
# race condition with parallel freeze
context.ch_ctl.create_shadow_increment()
futures: List[Future] = []
with ThreadPoolExecutor(max_workers=threads) as pool:
for table in tables:
future = pool.submit(
TableBackup._freeze_table,
context,
db,
table,
backup_name,
schema_only,
mtimes,
create_statement,
)
futures.append(future)

context.ch_ctl.remove_freezed_data()
result: List[Tuple[Table, bytes]] = []
for future in as_completed(futures):
table_and_statement = future.result()
if table_and_statement is not None:
result.append(table_and_statement)
return result

context.backup_layout.upload_backup_metadata(context.backup_meta)
@staticmethod
def _freeze_table(
context: BackupContext,
db: Database,
table: Table,
backup_name: str,
schema_only: bool,
) -> Optional[Tuple[Table, bytes]]:
"""
Freeze table and return it's create statement
"""
logging.debug('Trying to freeze "{}"."{}"', table.database, table.name)

create_statement = TableBackup._load_create_statement_from_disk(table)
if not create_statement:
logging.warning(
'Skipping table backup for "{}"."{}". Local metadata is empty or absent',
db.name,
table.name,
)
return None

# Freeze only MergeTree tables
if not schema_only and is_merge_tree(table.engine):
try:
context.ch_ctl.freeze_table(backup_name, table)
except ClickhouseError:
if context.ch_ctl.does_table_exist(table.database, table.name):
raise

logging.warning(
'Table "{}"."{}" was removed by a user during backup',
table.database,
table.name,
)
return None

return (table, create_statement)

@staticmethod
def _load_create_statement_from_disk(table: Table) -> Optional[bytes]:
"""
Load a create statement of the table from a metadata file on the disk.
"""
if not table.metadata_path:
logging.debug(
'Cannot load a create statement of the table "{}"."{}". Metadata is empty',
table.database,
table.name,
)
return None
try:
return Path(table.metadata_path).read_bytes()
except OSError as e:
logging.debug(
'Cannot load a create statement of the table "{}"."{}": {}',
table.database,
table.name,
str(e),
)
return None

@staticmethod
def _backup_cloud_storage_metadata(context: BackupContext) -> None:
Expand Down
Loading

0 comments on commit 87f9d30

Please sign in to comment.