Skip to content

Commit

Permalink
Add cleanup for database
Browse files Browse the repository at this point in the history
  • Loading branch information
MikhailBurdukov committed Sep 10, 2024
1 parent 81997e4 commit 08604eb
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 35 deletions.
10 changes: 9 additions & 1 deletion ch_backup/backup/metadata/backup_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,13 @@ def get_database(self, db_name: str) -> Database:
Get database.
"""
db_meta = self._databases[db_name]
return Database(db_name, db_meta.get("engine"), db_meta.get("metadata_path"))
return Database(
db_name,
db_meta.get("engine"),
db_meta.get("metadata_path"),
db_meta.get("uuid"),
db_meta.get("engine_full"),
)

def add_database(self, db: Database) -> None:
"""
Expand All @@ -239,6 +245,8 @@ def add_database(self, db: Database) -> None:
self._databases[db.name] = {
"engine": db.engine,
"metadata_path": db.metadata_path,
"uuid": db.uuid,
"engine_full": db.engine_full,
"tables": {},
}

Expand Down
18 changes: 15 additions & 3 deletions ch_backup/ch_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ch_backup.backup.metadata import BackupMetadata, BackupState, TableMetadata
from ch_backup.backup.sources import BackupSources
from ch_backup.backup_context import BackupContext
from ch_backup.clickhouse.metadata_cleaner import MetadataCleaner, select_replica_drop
from ch_backup.clickhouse.models import Database
from ch_backup.config import Config
from ch_backup.exceptions import (
Expand Down Expand Up @@ -514,8 +515,20 @@ def _restore(
db.set_engine_from_sql(db_sql)
databases[db_name] = db

metadata_cleaner: Optional[MetadataCleaner] = None

if clean_zookeeper and len(self._context.zk_config.get("hosts")) > 0:
metadata_cleaner = MetadataCleaner(
self._context.ch_ctl,
select_replica_drop(
replica_name, self._context.ch_ctl.get_macros()
),
)

# Restore databases.
self._database_backup_manager.restore(self._context, databases, keep_going)
self._database_backup_manager.restore(
self._context, databases, keep_going, metadata_cleaner
)

# Restore tables and data stored on local disks.
self._table_backup_manager.restore(
Expand All @@ -524,12 +537,11 @@ def _restore(
schema_only=sources.schema_only,
tables=tables,
exclude_tables=exclude_tables,
replica_name=replica_name,
metadata_cleaner=metadata_cleaner,
cloud_storage_source_bucket=cloud_storage_source_bucket,
cloud_storage_source_path=cloud_storage_source_path,
cloud_storage_source_endpoint=cloud_storage_source_endpoint,
skip_cloud_storage=skip_cloud_storage,
clean_zookeeper=clean_zookeeper,
keep_going=keep_going,
)

Expand Down
29 changes: 27 additions & 2 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,20 @@
"""
)

DROP_DATABASE_REPLICA_BY_ZK_PATH_SQL = strip_query(
"""
SYSTEM DROP DATABASE REPLICA '{replica_name}' FROM ZKPATH '{zk_path}'
"""
)

GET_DATABASES_SQL = strip_query(
"""
SELECT
name,
engine,
metadata_path
metadata_path,
uuid,
engine_full
FROM system.databases
WHERE name NOT IN ('system', '_temporary_and_external_tables', 'information_schema', 'INFORMATION_SCHEMA', '{system_db}')
FORMAT JSON
Expand Down Expand Up @@ -515,7 +523,13 @@ def get_databases(
)
if "data" in ch_resp:
result = [
Database(row["name"], row["engine"], row["metadata_path"])
Database(
row["name"],
row["engine"],
row["metadata_path"],
row["uuid"],
row["engine_full"],
)
for row in ch_resp["data"]
if row["name"] not in exclude_dbs
]
Expand Down Expand Up @@ -684,6 +698,17 @@ def system_drop_replica(self, replica: str, zookeeper_path: str) -> None:
timeout=self._drop_replica_timeout,
)

def system_drop_database_replica(self, replica: str, zookeeper_path: str) -> None:
"""
System drop database replica query.
"""
self._ch_client.query(
DROP_DATABASE_REPLICA_BY_ZK_PATH_SQL.format(
replica_name=replica, zk_path=zookeeper_path
),
timeout=self._drop_replica_timeout,
)

def get_database_metadata_path(self, database: str) -> str:
"""
Get filesystem absolute path to database metadata.
Expand Down
40 changes: 38 additions & 2 deletions ch_backup/clickhouse/metadata_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
from ch_backup import logging
from ch_backup.clickhouse.client import ClickhouseError
from ch_backup.clickhouse.control import ClickhouseCTL
from ch_backup.clickhouse.models import Table
from ch_backup.clickhouse.models import Database, Table
from ch_backup.exceptions import ConfigurationError
from ch_backup.util import get_table_zookeeper_paths, replace_macros
from ch_backup.util import (
get_database_zookeeper_paths,
get_table_zookeeper_paths,
replace_macros,
)


def select_replica_drop(replica_name: Optional[str], macros: Dict) -> str:
Expand Down Expand Up @@ -77,3 +81,35 @@ def clean_tables_metadata(self, replicated_tables: List[Table]) -> None:
)
else:
raise

def clean_database_metadata(self, replicated_databases: List[Database]) -> None:
"""
Remove replica database metadata from zookeeper.
"""
replicated_databases_paths = get_database_zookeeper_paths(replicated_databases)

for [database, database_path, shard] in replicated_databases_paths:
db_macros = dict(database=database.name, uuid=database.uuid)
db_macros.update(self._macros)

path_resolved = os.path.abspath(replace_macros(database_path, db_macros))
full_replica_name = f"{shard}|{self._replica_to_drop}"

logging.debug(
"Removing replica {} from database {} metadata from zookeeper {}.",
full_replica_name,
database.name,
path_resolved,
)
try:
self._ch_ctl.system_drop_database_replica(
replica=full_replica_name, zookeeper_path=path_resolved
)
except ClickhouseError as ch_error:
if "does not look like a path of Replicated database" in str(ch_error):
logging.warning(
"System drop database replica failed with: {}\n Will ignore it, probably different configuration for zookeeper or database schema.",
repr(ch_error),
)
else:
raise
9 changes: 8 additions & 1 deletion ch_backup/clickhouse/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,19 @@ class Database(SimpleNamespace):
"""

def __init__(
self, name: str, engine: Optional[str], metadata_path: Optional[str]
self,
name: str,
engine: Optional[str],
metadata_path: Optional[str],
uuid: Optional[str],
engine_full: Optional[str],
) -> None:
super().__init__()
self.name = name
self.engine = engine
self.metadata_path = metadata_path
self.uuid = uuid
self.engine_full = engine_full

def is_atomic(self) -> bool:
"""
Expand Down
16 changes: 14 additions & 2 deletions ch_backup/logic/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
Clickhouse backup logic for databases
"""

from typing import Dict, Sequence
from typing import Dict, Optional, Sequence

from ch_backup import logging
from ch_backup.backup_context import BackupContext
from ch_backup.clickhouse.metadata_cleaner import MetadataCleaner
from ch_backup.clickhouse.models import Database
from ch_backup.clickhouse.schema import (
embedded_schema_db_sql,
Expand All @@ -32,7 +33,10 @@ def backup(self, context: BackupContext, databases: Sequence[Database]) -> None:

@staticmethod
def restore(
context: BackupContext, databases: Dict[str, Database], keep_going: bool
context: BackupContext,
databases: Dict[str, Database],
keep_going: bool,
metadata_cleaner: Optional[MetadataCleaner],
) -> None:
"""
Restore database objects.
Expand All @@ -56,6 +60,14 @@ def restore(
databases_to_restore[name] = db
continue

if metadata_cleaner:
replicated_databases = [
database
for [_, database] in databases_to_restore.items()
if database.is_replicated_db_engine()
]
metadata_cleaner.clean_database_metadata(replicated_databases)

logging.info("Restoring databases: {}", ", ".join(databases_to_restore.keys()))
for db in databases_to_restore.values():
if db.has_embedded_metadata():
Expand Down
18 changes: 5 additions & 13 deletions ch_backup/logic/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from ch_backup.backup_context import BackupContext
from ch_backup.clickhouse.client import ClickhouseError
from ch_backup.clickhouse.disks import ClickHouseTemporaryDisks
from ch_backup.clickhouse.metadata_cleaner import MetadataCleaner, select_replica_drop
from ch_backup.clickhouse.metadata_cleaner import MetadataCleaner
from ch_backup.clickhouse.models import Database, FrozenPart, Table
from ch_backup.clickhouse.schema import (
rewrite_table_schema,
Expand Down Expand Up @@ -272,12 +272,11 @@ def restore(
schema_only: bool,
tables: List[TableMetadata],
exclude_tables: List[TableMetadata],
replica_name: Optional[str],
metadata_cleaner: Optional[MetadataCleaner],
cloud_storage_source_bucket: Optional[str],
cloud_storage_source_path: Optional[str],
cloud_storage_source_endpoint: Optional[str],
skip_cloud_storage: bool,
clean_zookeeper: bool,
keep_going: bool,
) -> None:
"""
Expand Down Expand Up @@ -342,8 +341,7 @@ def restore(
context,
databases,
tables_to_restore,
clean_zookeeper,
replica_name,
metadata_cleaner,
keep_going,
)

Expand Down Expand Up @@ -588,8 +586,7 @@ def _restore_tables(
context: BackupContext,
databases: Dict[str, Database],
tables: Iterable[Table],
clean_zookeeper: bool = False,
replica_name: Optional[str] = None,
metadata_cleaner: Optional[MetadataCleaner],
keep_going: bool = False,
) -> List[Table]:
logging.info("Preparing tables for restoring")
Expand All @@ -615,12 +612,7 @@ def _restore_tables(
else:
other_tables.append(table)

metadata_cleaner = MetadataCleaner(
context.ch_ctl,
select_replica_drop(replica_name, context.ch_ctl.get_macros()),
)

if clean_zookeeper and len(context.zk_config.get("hosts")) > 0: # type: ignore
if metadata_cleaner: # type: ignore
replicated_tables = [
table for table in merge_tree_tables if table.is_replicated()
]
Expand Down
12 changes: 5 additions & 7 deletions ch_backup/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,21 +301,19 @@ def get_table_zookeeper_paths(tables: Iterable) -> Iterable[Tuple]:
return result


def get_database_zookeeper_paths(databases: Iterable[str]) -> Iterable[str]:
def get_database_zookeeper_paths(databases: Iterable) -> Iterable[Tuple]:
"""
Parse ZooKeeper path from create statement.
Parse ZooKeeper path from database create statement and return path to database and shard from schema.
"""
result = []
for db_sql in databases:
for database in databases:
match = re.search(
R"""Replicated\(\'(?P<zk_path>[^']+)\', '(?P<shard>[^']+)', '(?P<replica>[^']+)'""",
db_sql,
database.engine_full,
)
if not match:
continue
result.append(
f'{match.group("zk_path")}/replicas/{match.group("shard")}|{match.group("replica")}'
)
result.append((database, match.group("zk_path"), match.group("shard")))
return result


Expand Down
26 changes: 24 additions & 2 deletions tests/integration/features/backup_replicated.feature
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ Feature: Backup replicated merge tree table
And we got same clickhouse data at clickhouse01 clickhouse02


Scenario: Host resetup with zookeeper cleanup
Scenario: Host resetup with zookeeper table cleanup
Given we have enabled shared zookeeper for clickhouse01
And we have enabled shared zookeeper for clickhouse02
And we have executed queries on clickhouse01
Expand All @@ -800,7 +800,7 @@ Feature: Backup replicated merge tree table
CounterID UInt32,
UserID UInt32
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/shard_01/test_db.table_01', '{replica}')
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{uuid}/test_db.table_01', '{replica}')
PARTITION BY CounterID % 10
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID);
Expand All @@ -819,3 +819,25 @@ Feature: Backup replicated merge tree table
"""
When we start clickhouse at clickhouse01
Then replica test_db.table_01 on clickhouse01 is read-only

Scenario: Host resetup with database table cleanup
Given we have enabled shared zookeeper for clickhouse01
And we have enabled shared zookeeper for clickhouse02
And we have executed queries on clickhouse01
"""
DROP DATABASE IF EXISTS db_repl SYNC;
CREATE DATABASE db_repl ENGINE = Replicated('/databases/{uuid}/db_repl', 'shard_01', '{replica}')
"""
When we create clickhouse01 clickhouse backup
Then we got the following backups on clickhouse01
| num | state | data_count | link_count |
| 0 | created | 0 | 0 |

When we stop clickhouse at clickhouse01
When we restore clickhouse backup #0 to clickhouse02
"""
replica_name: clickhouse01
schema_only: true
"""
When we start clickhouse at clickhouse01
Then database replica db_repl on clickhouse01 does not exists
7 changes: 7 additions & 0 deletions tests/integration/modules/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ def is_replica_ro(self, database: str, table: str) -> int:
)["data"][0]
return resp["is_readonly"]

def is_database_replica_exists(self, database: str) -> bool:
resp = self._query(
"GET",
f"SELECT count() as cnt FROM system.clusters WHERE cluster='{database}' and database_replica_name='{{replica}}' FORMAT JSON",
)["data"][0]
return bool(resp["cnt"])

def drop_test_table(self, db_num: int, table_num: int) -> None:
"""
Drop test table.
Expand Down
6 changes: 6 additions & 0 deletions tests/integration/steps/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,9 @@ def step_check_data_equal(context, node):
def step_check_replica_is_ro(context, table, database, node):
ch_client = ClickhouseClient(context, node)
assert ch_client.is_replica_ro(database, table)


@then("database replica {database} on {node:w} does not exists")
def step_check_no_database_replica(context, database, node):
ch_client = ClickhouseClient(context, node)
assert not ch_client.is_database_replica_exists(database)
4 changes: 3 additions & 1 deletion tests/unit/test_backup_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def test_backup_table_skipping_if_metadata_updated_during_backup(

# Prepare involved data objects
context = BackupContext(DEFAULT_CONFIG) # type: ignore[arg-type]
db = Database(db_name, "MergeTree", "/var/lib/clickhouse/metadata/db1.sql")
db = Database(
db_name, "MergeTree", "/var/lib/clickhouse/metadata/db1.sql", None, None
)
table_backup = TableBackup()
backup_meta = BackupMetadata(
name="20181017T210300",
Expand Down
Loading

0 comments on commit 08604eb

Please sign in to comment.