Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel FREEZE TABLE #164

Merged
merged 24 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4c3acbf
Freeze and backup thread pools
kirillgarbar Jun 19, 2024
faecbed
Remove freezed data for one table
kirillgarbar Jun 20, 2024
8807c86
Freeze workers option in config
kirillgarbar Jun 20, 2024
1e86b09
More flexibility of data generation for tests
kirillgarbar Jun 22, 2024
6b7182b
Parallel freeze test
kirillgarbar Jun 22, 2024
c45b386
Move executors to subclass
kirillgarbar Jun 22, 2024
ec20451
Partition by prefix to fix dedup tests
kirillgarbar Jun 22, 2024
8b795ae
Fix codespell
kirillgarbar Jun 24, 2024
4f8bfec
Fix for old python
kirillgarbar Jun 24, 2024
7554870
Remove backup executor and backup in the main thread
kirillgarbar Jun 25, 2024
06cb097
Fix black
kirillgarbar Jun 25, 2024
95bc77b
Fix race condition with shadow/increment.txt
kirillgarbar Jun 25, 2024
0214f1e
Parallel backup
kirillgarbar Jul 1, 2024
6933163
Fix unit test - can't pickle lock
kirillgarbar Jul 1, 2024
d0a25c9
Revert "Fix unit test - can't pickle lock"
kirillgarbar Jul 8, 2024
827a66d
Revert "Parallel backup"
kirillgarbar Jul 8, 2024
93da739
Backup tables after all tables are frozen
kirillgarbar Jul 8, 2024
b7a4358
Move thread pool related logic to separate class
kirillgarbar Jul 8, 2024
6d1076a
Move all freeze related logic to TableFreezer
kirillgarbar Jul 8, 2024
c70ad1f
Small fixes
kirillgarbar Jul 8, 2024
190539c
Fix exception suppression in TableFreezer's __exit__
kirillgarbar Jul 8, 2024
59fdf82
Fix mypy
kirillgarbar Jul 9, 2024
2f12cb7
Fix black
kirillgarbar Jul 9, 2024
87f9d30
Move freeze logic to the method
kirillgarbar Jul 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ def __init__(self, config: Config) -> None:
self._config = config
self._access_backup_manager = AccessBackup()
self._database_backup_manager = DatabaseBackup()
self._table_backup_manager = TableBackup()
self._table_backup_manager = TableBackup(
self._config.get("multiprocessing").get("freeze_workers", 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense use get() only when a default is expected. The error of absent key for [] is more readable.

)
self._udf_backup_manager = UDFBackup()
self._nc_backup_manager = NamedCollectionsBackup()

Expand Down
45 changes: 39 additions & 6 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ch_backup.exceptions import ClickhouseBackupError
from ch_backup.util import (
chown_dir_contents,
chown_file,
escape,
list_dir_files,
retry,
Expand Down Expand Up @@ -474,15 +475,27 @@ def system_unfreeze(self, backup_name: str) -> None:
query_sql = SYSTEM_UNFREEZE_SQL.format(backup_name=backup_name)
self._ch_client.query(query_sql, timeout=self._unfreeze_timeout)

def remove_freezed_data(self) -> None:
def remove_freezed_data(
self, backup_name: Optional[str] = None, table: Optional[Table] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's introduce a new object with backup_name and table as members. Or add checking and throwing an exception when only one from the pair is provided.

) -> None:
"""
Remove all freezed partitions from all local disks.
"""
for disk in self._disks.values():
if disk.type == "local":
shadow_path = os.path.join(disk.path, "shadow")
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)
if backup_name and table:
for table_data_path, disk in table.paths_with_disks:
if disk.type == "local":
table_relative_path = os.path.relpath(table_data_path, disk.path)
shadow_path = os.path.join(
disk.path, "shadow", backup_name, table_relative_path
)
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)
else:
for disk in self._disks.values():
if disk.type == "local":
shadow_path = os.path.join(disk.path, "shadow")
logging.debug("Removing shadow data: {}", shadow_path)
self._remove_shadow_data(shadow_path)

def remove_freezed_part(self, part: FrozenPart) -> None:
"""
Expand Down Expand Up @@ -776,6 +789,26 @@ def chown_dir(self, dir_path: str) -> None:
need_recursion,
)

def create_shadow_increment(self) -> None:
"""
Create shadow/increment.txt to fix race condition with parallel freeze.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some issue or comment in the code of CH about this to mention here ?

Must be used before freezing more than one table at once.
"""
default_shadow_path = Path(self._root_data_path) / "shadow"
increment_path = default_shadow_path / "increment.txt"
if os.path.exists(increment_path):
return
if not os.path.exists(default_shadow_path):
os.mkdir(default_shadow_path)
self.chown_dir(str(default_shadow_path))
with open(increment_path, "w", encoding="utf-8") as file:
file.write("0")
chown_file(
self._ch_ctl_config["user"],
self._ch_ctl_config["group"],
str(increment_path),
)

@retry(OSError)
def _remove_shadow_data(self, path: str) -> None:
if path.find("/shadow") == -1:
Expand Down
2 changes: 2 additions & 0 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ def _as_seconds(t: str) -> int:
"workers": 4,
# The number of processes for parts restoring from S3 disks.
"cloud_storage_restore_workers": 4,
# The number of processes for parallel freeze of tables
"freeze_workers": 4,
},
"pipeline": {
# Is asynchronous pipelines used (based on Pypeln library)
Expand Down
99 changes: 75 additions & 24 deletions ch_backup/logic/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

import os
from collections import deque
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from functools import partial
from itertools import chain
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple

from ch_backup import logging
from ch_backup.backup.deduplication import deduplicate_parts
Expand Down Expand Up @@ -49,6 +50,9 @@ class TableBackup(BackupManager):
Table backup class
"""

def __init__(self, freeze_workers: int = 1):
self._freeze_workers = freeze_workers

def backup(
self,
context: BackupContext,
Expand All @@ -59,6 +63,7 @@ def backup(
"""
Backup tables metadata, MergeTree data and Cloud storage metadata.
"""

backup_name = context.backup_meta.get_sanitized_name()

for db in databases:
Expand All @@ -69,6 +74,7 @@ def backup(
backup_name,
schema_only,
)

self._backup_cloud_storage_metadata(context)

def _collect_local_metadata_mtime(
Expand Down Expand Up @@ -114,18 +120,43 @@ def _backup(
# See https://en.wikipedia.org/wiki/Optimistic_concurrency_control
mtimes = self._collect_local_metadata_mtime(context, db, tables)

for table in context.ch_ctl.get_tables(db.name, tables):
if table.name not in mtimes:
continue
# Create shadow/increment.txt if not exists manually to avoid
# race condition with parallel freeze
context.ch_ctl.create_shadow_increment()

self._backup_table(
context,
db,
table,
backup_name,
schema_only,
mtimes,
)
with ThreadPoolExecutor(
max_workers=self._freeze_workers
) as freeze_executor:
freeze_futures = []

for table in context.ch_ctl.get_tables(db.name, tables):
if table.name not in mtimes:
continue

logging.debug(
'Adding "{}"."{}" to the freeze and backup queue',
table.database,
table.name,
)

freeze_futures.append(
freeze_executor.submit(
self._freeze_table,
context,
db,
table,
backup_name,
schema_only,
mtimes,
)
)

for freeze_future in as_completed(freeze_futures):
backup_freezed_table = freeze_future.result()
if backup_freezed_table is not None:
backup_freezed_table()

context.ch_ctl.remove_freezed_data()

context.backup_layout.upload_backup_metadata(context.backup_meta)

Expand Down Expand Up @@ -295,31 +326,27 @@ def _load_create_statement_from_disk(table: Table) -> Optional[bytes]:
)
return None

def _backup_table(
def _freeze_table(
self,
context: BackupContext,
db: Database,
table: Table,
backup_name: str,
schema_only: bool,
mtimes: Dict[str, TableMetadataMtime],
) -> None:
) -> Optional[Callable]:
"""
Make backup of metadata and data of single table.

Return backup metadata of successfully backuped table, otherwise None.
Freeze table and return function which backups freezed table.
"""
logging.debug(
'Performing table backup for "{}"."{}"', table.database, table.name
)
logging.debug('Trying to freeze "{}"."{}"', table.database, table.name)
create_statement = self._load_create_statement_from_disk(table)
if not create_statement:
logging.warning(
'Skipping table backup for "{}"."{}". Local metadata is empty or absent',
db.name,
table.name,
)
return
return None

# Freeze only MergeTree tables
if not schema_only and is_merge_tree(table.engine):
Expand All @@ -334,8 +361,29 @@ def _backup_table(
table.database,
table.name,
)
return
return None

return partial(
self._backup_table_after_freeze,
context,
db,
table,
backup_name,
schema_only,
mtimes,
create_statement,
)

def _backup_table_after_freeze(
self,
context: BackupContext,
db: Database,
table: Table,
backup_name: str,
schema_only: bool,
mtimes: Dict[str, TableMetadataMtime],
create_statement: bytes,
) -> None:
# Check if table metadata was updated
new_mtime = self._get_mtime(table.metadata_path)
if new_mtime is None or mtimes[table.name].mtime != new_mtime:
Expand All @@ -344,9 +392,12 @@ def _backup_table(
table.database,
table.name,
)
context.ch_ctl.remove_freezed_data()
context.ch_ctl.remove_freezed_data(backup_name, table)
return

logging.debug(
'Performing table backup for "{}"."{}"', table.database, table.name
)
# Add table metadata to backup metadata
context.backup_meta.add_table(
TableMetadata(table.database, table.name, table.engine, table.uuid)
Expand Down Expand Up @@ -436,7 +487,7 @@ def deduplicate_parts_in_batch(

self._validate_uploaded_parts(context, upload_observer.uploaded_parts)

context.ch_ctl.remove_freezed_data()
context.ch_ctl.remove_freezed_data(backup_name, table)

@staticmethod
def _validate_uploaded_parts(context: BackupContext, uploaded_parts: list) -> None:
Expand Down
7 changes: 7 additions & 0 deletions ch_backup/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ def chown_dir_contents(
shutil.chown(os.path.join(dir_path, path), user, group)


def chown_file(user: str, group: str, file_path: str) -> None:
"""
Change directory user/group
"""
shutil.chown(file_path, user, group)


def list_dir_files(dir_path: str) -> List[str]:
"""
Returns paths of all files of directory (recursively), relative to its path
Expand Down
1 change: 1 addition & 0 deletions tests/integration/ch_backup.featureset
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ features/cloud_storage.feature
features/database_engines.feature
features/ddl_dictionary.feature
features/deduplication.feature
features/freeze_parallel.feature
features/incremental_restore.feature
features/metadata.feature
features/min_interval.feature
Expand Down
31 changes: 31 additions & 0 deletions tests/integration/features/freeze_parallel.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
Feature: Parallel freeze

Background:
Given default configuration
And a working s3
And a working zookeeper on zookeeper01
And a working clickhouse on clickhouse01
And a working clickhouse on clickhouse02
And clickhouse on clickhouse01 has test schema with 5 databases and 10 tables
And clickhouse01 has test clickhouse data test1 with 5 databases, 10 tables, 100 rows and 5 partitions

Scenario: Create backup with single freeze worker
Given ch-backup configuration on clickhouse01
"""
multiprocessing:
freeze_workers: 1
"""
When we create clickhouse01 clickhouse backup
Then we got the following backups on clickhouse01
| num | state | data_count | link_count | title |
| 0 | created | 250 | 0 | shared |
When we restore clickhouse backup #0 to clickhouse02
Then we got same clickhouse data at clickhouse01 clickhouse02

Scenario: Create backup with default number of freeze workers
When we create clickhouse01 clickhouse backup
Then we got the following backups on clickhouse01
| num | state | data_count | link_count | title |
| 0 | created | 250 | 0 | shared |
When we restore clickhouse backup #0 to clickhouse02
Then we got same clickhouse data at clickhouse01 clickhouse02
Loading
Loading