Skip to content

Commit

Permalink
Paraller system drop replica
Browse files Browse the repository at this point in the history
  • Loading branch information
MikhailBurdukov committed Oct 21, 2024
1 parent 93363b2 commit cd08798
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 9 deletions.
1 change: 1 addition & 0 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ def _restore(
select_replica_drop(
replica_name, self._context.ch_ctl.get_macros()
),
self._config["multiprocessing"]["drop_replica_threads"],
)

# Restore databases.
Expand Down
36 changes: 27 additions & 9 deletions ch_backup/clickhouse/metadata_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import copy
import os
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Dict, List, Optional

from ch_backup import logging
Expand Down Expand Up @@ -38,22 +39,24 @@ class MetadataCleaner:
Class for cleaning up replica metadata from zookeeper.
"""

def __init__(self, ch_ctl: ClickhouseCTL, replica_to_drop: str) -> None:
def __init__(
self, ch_ctl: ClickhouseCTL, replica_to_drop: str, max_workers: int
) -> None:
self._ch_ctl = ch_ctl
self._macros = self._ch_ctl.get_macros()
self._replica_to_drop = replica_to_drop or self._macros.get("replica")

if not self._replica_to_drop:
raise ConfigurationError(
"Can't get the replica name. Please, specify it through macros or replica_name knob."
)
self._exec_pool = ThreadPoolExecutor(max_workers)

def clean_tables_metadata(self, replicated_tables: List[Table]) -> None:
"""
Remove replica tables metadata from zookeeper.
"""
replicated_table_paths = get_table_zookeeper_paths(replicated_tables)

tasks: Dict[str, Future] = {}
for table, table_path in replicated_table_paths:
table_macros = copy.copy(self._macros)
macros_to_override = dict(
Expand All @@ -64,14 +67,21 @@ def clean_tables_metadata(self, replicated_tables: List[Table]) -> None:
path_resolved = os.path.abspath(replace_macros(table_path, table_macros))

logging.debug(
"Removing replica {} from table {} metadata from zookeeper {}.",
"Scheduling drop replica {} from table {} metadata from zookeeper {}.",
self._replica_to_drop,
f"{table.database}.{table.database}",
path_resolved,
)
future = self._exec_pool.submit(
self._ch_ctl.system_drop_replica, self._replica_to_drop, path_resolved # type: ignore
)
tasks[f"{table.database}.{table.name}"] = future

for full_table_name, future in tasks.items():
try:
self._ch_ctl.system_drop_replica(
replica=self._replica_to_drop, zookeeper_path=path_resolved # type: ignore
future.result()
logging.debug(
"Successful zk metadata cleanup for table {}", full_table_name
)
except ClickhouseError as ch_error:
if "does not look like a table path" in str(ch_error):
Expand All @@ -93,6 +103,7 @@ def clean_database_metadata(self, replicated_databases: List[Database]) -> None:
return

replicated_databases_paths = get_database_zookeeper_paths(replicated_databases)
tasks: Dict[str, Future] = {}

for database, database_path, shard in replicated_databases_paths:
db_macros = copy.copy(self._macros)
Expand All @@ -106,14 +117,21 @@ def clean_database_metadata(self, replicated_databases: List[Database]) -> None:
)

logging.debug(
"Removing replica {} from database {} metadata from zookeeper {}.",
"Scheduling replica {} from database {} metadata from zookeeper {}.",
full_replica_name,
database.name,
path_resolved,
)
future = self._exec_pool.submit(
self._ch_ctl.system_drop_database_replica, self._replica_to_drop, path_resolved # type: ignore
)
tasks[f"{database.name}"] = future

for database_name, future in tasks.items():
try:
self._ch_ctl.system_drop_database_replica(
replica=full_replica_name, zookeeper_path=path_resolved # type: ignore
future.result()
logging.debug(
"Successful zk metadata cleanup for database {}", database_name
)
except ClickhouseError as ch_error:
if "does not look like a path of Replicated database" in str(
Expand Down
2 changes: 2 additions & 0 deletions ch_backup/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ def _as_seconds(t: str) -> int:
"cloud_storage_restore_workers": 4,
# The number of threads for parallel freeze of tables
"freeze_threads": 4,
# The number of threads for parallel drop replica
"drop_replica_threads": 8,
},
"pipeline": {
# Is asynchronous pipelines used (based on Pypeln library)
Expand Down

0 comments on commit cd08798

Please sign in to comment.