Skip to content

Commit

Permalink
Parallel FREEZE TABLE (#164)
Browse files Browse the repository at this point in the history
* Freeze and backup thread pools

* Remove freezed data for one table

* Freeze workers option in config

* More flexibility of data generation for tests

* Parallel freeze test

* Move executors to subclass

* Partition by prefix to fix dedup tests

* Fix codespell

* Fix for old python

* Remove backup executor and backup in the main thread

* Fix black

* Fix race condition with shadow/increment.txt

* Parallel backup

* Fix unit test - can't pickle lock

* Revert "Fix unit test - can't pickle lock"

This reverts commit 6933163.

* Revert "Parallel backup"

This reverts commit 0214f1e.

* Backup tables after all tables are frozen

* Move thread pool related logic to separate class

* Move all freeze related logic to TableFreezer

* Small fixes

* Fix exception suppression in TableFreezer's __exit__

* Fix mypy

* Fix black

* Move freeze logic to the method
  • Loading branch information
kirillgarbar authored Jul 10, 2024
1 parent 0b008ff commit 248e0ce
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 96 deletions.
3 changes: 3 additions & 0 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ def backup(
databases,
db_tables,
schema_only=sources.schema_only,
freeze_threads=self._config["multiprocessing"][
"freeze_threads"
],
)

self._context.backup_meta.state = BackupState.CREATED
Expand Down
51 changes: 45 additions & 6 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ch_backup.exceptions import ClickhouseBackupError
from ch_backup.util import (
chown_dir_contents,
chown_file,
escape,
list_dir_files,
retry,
Expand Down Expand Up @@ -474,15 +475,32 @@ def system_unfreeze(self, backup_name: str) -> None:
query_sql = SYSTEM_UNFREEZE_SQL.format(backup_name=backup_name)
self._ch_client.query(query_sql, timeout=self._unfreeze_timeout)

def remove_freezed_data(self) -> None:
def remove_freezed_data(
self, backup_name: Optional[str] = None, table: Optional[Table] = None
) -> None:
"""
Remove all freezed partitions from all local disks.
"""
for disk in self._disks.values():
if disk.type == "local":
shadow_path = os.path.join(disk.path, "shadow")
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)
if not (backup_name is None) == (table is None):
raise RuntimeError(
"Both backup_name and table should be None or not None at the same time"
)

if backup_name and table:
for table_data_path, disk in table.paths_with_disks:
if disk.type == "local":
table_relative_path = os.path.relpath(table_data_path, disk.path)
shadow_path = os.path.join(
disk.path, "shadow", backup_name, table_relative_path
)
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)
else:
for disk in self._disks.values():
if disk.type == "local":
shadow_path = os.path.join(disk.path, "shadow")
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)

def remove_freezed_part(self, part: FrozenPart) -> None:
"""
Expand Down Expand Up @@ -776,6 +794,27 @@ def chown_dir(self, dir_path: str) -> None:
need_recursion,
)

def create_shadow_increment(self) -> None:
"""
Create shadow/increment.txt to fix race condition with parallel freeze.
Must be used before freezing more than one table at once.
https://github.com/ClickHouse/ClickHouse/blob/597a72fd9afd88984abc10b284624c6b4d08368b/src/Common/Increment.h#L20
"""
default_shadow_path = Path(self._root_data_path) / "shadow"
increment_path = default_shadow_path / "increment.txt"
if os.path.exists(increment_path):
return
if not os.path.exists(default_shadow_path):
os.mkdir(default_shadow_path)
self.chown_dir(str(default_shadow_path))
with open(increment_path, "w", encoding="utf-8") as file:
file.write("0")
chown_file(
self._ch_ctl_config["user"],
self._ch_ctl_config["group"],
str(increment_path),
)

@retry(OSError)
def _remove_shadow_data(self, path: str) -> None:
if path.find("/shadow") == -1:
Expand Down
2 changes: 2 additions & 0 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ def _as_seconds(t: str) -> int:
"workers": 4,
# The number of processes for parts restoring from S3 disks.
"cloud_storage_restore_workers": 4,
# The number of threads for parallel freeze of tables
"freeze_threads": 4,
},
"pipeline": {
# Is asynchronous pipelines used (based on Pypeln library)
Expand Down
188 changes: 126 additions & 62 deletions ch_backup/logic/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import os
from collections import deque
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from functools import partial
from itertools import chain
Expand Down Expand Up @@ -55,10 +56,12 @@ def backup(
databases: Sequence[Database],
db_tables: Dict[str, list],
schema_only: bool,
freeze_threads: int,
) -> None:
"""
Backup tables metadata, MergeTree data and Cloud storage metadata.
"""

backup_name = context.backup_meta.get_sanitized_name()

for db in databases:
Expand All @@ -68,7 +71,9 @@ def backup(
db_tables[db.name],
backup_name,
schema_only,
freeze_threads,
)

self._backup_cloud_storage_metadata(context)

def _collect_local_metadata_mtime(
Expand Down Expand Up @@ -103,6 +108,7 @@ def _backup(
tables: Sequence[str],
backup_name: str,
schema_only: bool,
freeze_threads: int,
) -> None:
"""
Backup single database tables.
Expand All @@ -113,22 +119,131 @@ def _backup(
# To ensure consistency between metadata and data backups.
# See https://en.wikipedia.org/wiki/Optimistic_concurrency_control
mtimes = self._collect_local_metadata_mtime(context, db, tables)
tables_ = list(
filter(
lambda table: table.name in mtimes,
context.ch_ctl.get_tables(db.name, tables),
)
)

for table in context.ch_ctl.get_tables(db.name, tables):
if table.name not in mtimes:
continue
freezed_tables = self._freeze_tables(
context, db, tables_, backup_name, schema_only, freeze_threads
)

self._backup_table(
logging.debug('All tables from "{}" are frozen', db.name)

for table, create_statement in freezed_tables:
self._backup_freezed_table(
context,
db,
table,
backup_name,
schema_only,
mtimes,
create_statement,
)

context.ch_ctl.remove_freezed_data()

context.backup_layout.upload_backup_metadata(context.backup_meta)

@staticmethod
def _freeze_tables(
context: BackupContext,
db: Database,
tables: Sequence[Table],
backup_name: str,
schema_only: bool,
threads: int,
) -> List[Tuple[Table, bytes]]:
"""
Freeze tables in parallel.
"""
# Create shadow/increment.txt if not exists manually to avoid
# race condition with parallel freeze
context.ch_ctl.create_shadow_increment()
futures: List[Future] = []
with ThreadPoolExecutor(max_workers=threads) as pool:
for table in tables:
future = pool.submit(
TableBackup._freeze_table,
context,
db,
table,
backup_name,
schema_only,
)
futures.append(future)

result: List[Tuple[Table, bytes]] = []
for future in as_completed(futures):
table_and_statement = future.result()
if table_and_statement is not None:
result.append(table_and_statement)
return result

@staticmethod
def _freeze_table(
context: BackupContext,
db: Database,
table: Table,
backup_name: str,
schema_only: bool,
) -> Optional[Tuple[Table, bytes]]:
"""
Freeze table and return it's create statement
"""
logging.debug('Trying to freeze "{}"."{}"', table.database, table.name)

create_statement = TableBackup._load_create_statement_from_disk(table)
if not create_statement:
logging.warning(
'Skipping table backup for "{}"."{}". Local metadata is empty or absent',
db.name,
table.name,
)
return None

# Freeze only MergeTree tables
if not schema_only and is_merge_tree(table.engine):
try:
context.ch_ctl.freeze_table(backup_name, table)
except ClickhouseError:
if context.ch_ctl.does_table_exist(table.database, table.name):
raise

logging.warning(
'Table "{}"."{}" was removed by a user during backup',
table.database,
table.name,
)
return None

return (table, create_statement)

@staticmethod
def _load_create_statement_from_disk(table: Table) -> Optional[bytes]:
"""
Load a create statement of the table from a metadata file on the disk.
"""
if not table.metadata_path:
logging.debug(
'Cannot load a create statement of the table "{}"."{}". Metadata is empty',
table.database,
table.name,
)
return None
try:
return Path(table.metadata_path).read_bytes()
except OSError as e:
logging.debug(
'Cannot load a create statement of the table "{}"."{}": {}',
table.database,
table.name,
str(e),
)
return None

@staticmethod
def _backup_cloud_storage_metadata(context: BackupContext) -> None:
"""
Expand Down Expand Up @@ -272,70 +387,16 @@ def restore(
keep_going=keep_going,
)

@staticmethod
def _load_create_statement_from_disk(table: Table) -> Optional[bytes]:
"""
Load a create statement of the table from a metadata file on the disk.
"""
if not table.metadata_path:
logging.debug(
'Cannot load a create statement of the table "{}"."{}". Metadata is empty',
table.database,
table.name,
)
return None
try:
return Path(table.metadata_path).read_bytes()
except OSError as e:
logging.debug(
'Cannot load a create statement of the table "{}"."{}": {}',
table.database,
table.name,
str(e),
)
return None

def _backup_table(
def _backup_freezed_table(
self,
context: BackupContext,
db: Database,
table: Table,
backup_name: str,
schema_only: bool,
mtimes: Dict[str, TableMetadataMtime],
create_statement: bytes,
) -> None:
"""
Make backup of metadata and data of single table.
Return backup metadata of successfully backuped table, otherwise None.
"""
logging.debug(
'Performing table backup for "{}"."{}"', table.database, table.name
)
create_statement = self._load_create_statement_from_disk(table)
if not create_statement:
logging.warning(
'Skipping table backup for "{}"."{}". Local metadata is empty or absent',
db.name,
table.name,
)
return

# Freeze only MergeTree tables
if not schema_only and is_merge_tree(table.engine):
try:
context.ch_ctl.freeze_table(backup_name, table)
except ClickhouseError:
if context.ch_ctl.does_table_exist(table.database, table.name):
raise

logging.warning(
'Table "{}"."{}" was removed by a user during backup',
table.database,
table.name,
)
return

# Check if table metadata was updated
new_mtime = self._get_mtime(table.metadata_path)
if new_mtime is None or mtimes[table.name].mtime != new_mtime:
Expand All @@ -344,9 +405,12 @@ def _backup_table(
table.database,
table.name,
)
context.ch_ctl.remove_freezed_data()
context.ch_ctl.remove_freezed_data(backup_name, table)
return

logging.debug(
'Performing table backup for "{}"."{}"', table.database, table.name
)
# Add table metadata to backup metadata
context.backup_meta.add_table(
TableMetadata(table.database, table.name, table.engine, table.uuid)
Expand Down Expand Up @@ -436,7 +500,7 @@ def deduplicate_parts_in_batch(

self._validate_uploaded_parts(context, upload_observer.uploaded_parts)

context.ch_ctl.remove_freezed_data()
context.ch_ctl.remove_freezed_data(backup_name, table)

@staticmethod
def _validate_uploaded_parts(context: BackupContext, uploaded_parts: list) -> None:
Expand Down
7 changes: 7 additions & 0 deletions ch_backup/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ def chown_dir_contents(
shutil.chown(os.path.join(dir_path, path), user, group)


def chown_file(user: str, group: str, file_path: str) -> None:
"""
Change directory user/group
"""
shutil.chown(file_path, user, group)


def list_dir_files(dir_path: str) -> List[str]:
"""
Returns paths of all files of directory (recursively), relative to its path
Expand Down
1 change: 1 addition & 0 deletions tests/integration/ch_backup.featureset
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ features/cloud_storage.feature
features/database_engines.feature
features/ddl_dictionary.feature
features/deduplication.feature
features/freeze_parallel.feature
features/incremental_restore.feature
features/metadata.feature
features/min_interval.feature
Expand Down
Loading

0 comments on commit 248e0ce

Please sign in to comment.