Skip to content

Commit

Permalink
Move all freeze related logic to TableFreezer
Browse files Browse the repository at this point in the history
  • Loading branch information
kirillgarbar committed Jul 8, 2024
1 parent b7a4358 commit 6d1076a
Showing 1 changed file with 104 additions and 99 deletions.
203 changes: 104 additions & 99 deletions ch_backup/logic/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from functools import partial
from itertools import chain
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple

from ch_backup import logging
from ch_backup.backup.deduplication import deduplicate_parts
Expand All @@ -35,7 +35,7 @@
from ch_backup.util import compare_schema, get_table_zookeeper_paths


class TableFreezePool:
class TableFreezer:
"""
Class to freeze tables in parallel.
"""
Expand All @@ -48,20 +48,13 @@ def __enter__(self):
return self

def __exit__(self, *exc: Any) -> bool:
self.shutdown()
self._thread_pool.shutdown(wait=False)
self._futures.clear()
return True

def submit(self, fn: Callable, *args: Any, **kwargs: Any) -> None:
"""
Submit freeze task to the executor.
Task is expected to return callable for table backup.
"""
result = self._thread_pool.submit(fn, *args, **kwargs)
self._futures.append(result)

def wait(self) -> List[Callable]:
def wait(self) -> List[Tuple[Table, bytes]]:
"""
Wait for all submitted tables to freeze and return callables to backup frozen tables.
Wait for all submitted tables to freeze and return create statements.
"""
result = []
for future in as_completed(self._futures):
Expand All @@ -71,12 +64,88 @@ def wait(self) -> List[Callable]:
self._futures.clear()
return result

def shutdown(self):
def freeze_table(
self,
context: BackupContext,
db: Database,
table: Table,
backup_name: str,
schema_only: bool,
) -> None:
"""
Clean-up the resources associated with the thread pool.
Submit table to the freeze pool.
"""
self._thread_pool.shutdown(wait=False)
self._futures.clear()
future = self._thread_pool.submit(
self._freeze_table,
context,
db,
table,
backup_name,
schema_only,
)
self._futures.append(future)

def _freeze_table(
self,
context: BackupContext,
db: Database,
table: Table,
backup_name: str,
schema_only: bool,
) -> Optional[Tuple[Table, bytes]]:
"""
Freeze table and return it's create statement
"""
logging.debug('Trying to freeze "{}"."{}"', table.database, table.name)

create_statement = self._load_create_statement_from_disk(table)
if not create_statement:
logging.warning(
'Skipping table backup for "{}"."{}". Local metadata is empty or absent',
db.name,
table.name,
)
return None

# Freeze only MergeTree tables
if not schema_only and is_merge_tree(table.engine):
try:
context.ch_ctl.freeze_table(backup_name, table)
except ClickhouseError:
if context.ch_ctl.does_table_exist(table.database, table.name):
raise

logging.warning(
'Table "{}"."{}" was removed by a user during backup',
table.database,
table.name,
)
return None

return (table, create_statement)

@staticmethod
def _load_create_statement_from_disk(table: Table) -> Optional[bytes]:
"""
Load a create statement of the table from a metadata file on the disk.
"""
if not table.metadata_path:
logging.debug(
'Cannot load a create statement of the table "{}"."{}". Metadata is empty',
table.database,
table.name,
)
return None
try:
return Path(table.metadata_path).read_bytes()
except OSError as e:
logging.debug(
'Cannot load a create statement of the table "{}"."{}": {}',
table.database,
table.name,
str(e),
)
return None


@dataclass
Expand Down Expand Up @@ -110,10 +179,10 @@ def backup(

backup_name = context.backup_meta.get_sanitized_name()

with TableFreezePool(self._freeze_threads) as freeze_pool:
with TableFreezer(self._freeze_threads) as table_freezer:
for db in databases:
self._backup(
freeze_pool,
table_freezer,
context,
db,
db_tables[db.name],
Expand Down Expand Up @@ -150,7 +219,7 @@ def _collect_local_metadata_mtime(

def _backup(
self,
freeze_pool: TableFreezePool,
table_freezer: TableFreezer,
context: BackupContext,
db: Database,
tables: Sequence[str],
Expand Down Expand Up @@ -181,22 +250,29 @@ def _backup(
table.name,
)

freeze_pool.submit(
self._freeze_table,
table_freezer.freeze_table(
context,
db,
table,
backup_name,
schema_only,
mtimes,
)

backup_functions = freeze_pool.wait()
# Wait until all tables are freezed
freezed_tables = table_freezer.wait()

logging.debug("All tables from {} are frozen", db.name)
logging.debug('All tables from "{}" are frozen', db.name)

for backup_freezed_table in backup_functions:
backup_freezed_table()
for table, create_statement in freezed_tables:
self._backup_freezed_table(
context,
db,
table,
backup_name,
schema_only,
mtimes,
create_statement,
)

context.ch_ctl.remove_freezed_data()

Expand Down Expand Up @@ -345,78 +421,7 @@ def restore(
keep_going=keep_going,
)

@staticmethod
def _load_create_statement_from_disk(table: Table) -> Optional[bytes]:
"""
Load a create statement of the table from a metadata file on the disk.
"""
if not table.metadata_path:
logging.debug(
'Cannot load a create statement of the table "{}"."{}". Metadata is empty',
table.database,
table.name,
)
return None
try:
return Path(table.metadata_path).read_bytes()
except OSError as e:
logging.debug(
'Cannot load a create statement of the table "{}"."{}": {}',
table.database,
table.name,
str(e),
)
return None

def _freeze_table(
self,
context: BackupContext,
db: Database,
table: Table,
backup_name: str,
schema_only: bool,
mtimes: Dict[str, TableMetadataMtime],
) -> Optional[Callable]:
"""
Freeze table and return function which backups freezed table.
"""
logging.debug('Trying to freeze "{}"."{}"', table.database, table.name)
create_statement = self._load_create_statement_from_disk(table)
if not create_statement:
logging.warning(
'Skipping table backup for "{}"."{}". Local metadata is empty or absent',
db.name,
table.name,
)
return None

# Freeze only MergeTree tables
if not schema_only and is_merge_tree(table.engine):
try:
context.ch_ctl.freeze_table(backup_name, table)
except ClickhouseError:
if context.ch_ctl.does_table_exist(table.database, table.name):
raise

logging.warning(
'Table "{}"."{}" was removed by a user during backup',
table.database,
table.name,
)
return None

return partial(
self._backup_table_after_freeze,
context,
db,
table,
backup_name,
schema_only,
mtimes,
create_statement,
)

def _backup_table_after_freeze(
def _backup_freezed_table(
self,
context: BackupContext,
db: Database,
Expand Down

0 comments on commit 6d1076a

Please sign in to comment.