diff --git a/ch_backup/backup/deduplication.py b/ch_backup/backup/deduplication.py index 69375acf..b703a825 100644 --- a/ch_backup/backup/deduplication.py +++ b/ch_backup/backup/deduplication.py @@ -11,8 +11,7 @@ from ch_backup.backup.layout import BackupLayout from ch_backup.backup.metadata import BackupMetadata, BackupState, PartMetadata from ch_backup.backup_context import BackupContext -from ch_backup.clickhouse.models import Database, FrozenPart -from ch_backup.clickhouse.schema import is_replicated +from ch_backup.clickhouse.models import Database, FrozenPart, Table from ch_backup.util import Slotted, utcnow @@ -172,7 +171,7 @@ def _populate_dedup_info( for db in databases_to_iterate: db_dedup_info = dedup_info[db.name] for table in backup.get_tables(db.name): - replicated = is_replicated(table.engine) + replicated = Table.engine_is_replicated(table.engine) if replicated and db.replicated_tables_handled: continue if not replicated and ( diff --git a/ch_backup/clickhouse/control.py b/ch_backup/clickhouse/control.py index 70ff489e..c01c88b1 100644 --- a/ch_backup/clickhouse/control.py +++ b/ch_backup/clickhouse/control.py @@ -20,7 +20,6 @@ from ch_backup.calculators import calc_aligned_files_size from ch_backup.clickhouse.client import ClickhouseClient from ch_backup.clickhouse.models import Database, Disk, FrozenPart, Table -from ch_backup.clickhouse.schema import is_replicated from ch_backup.exceptions import ClickhouseBackupError from ch_backup.util import ( chown_dir_contents, @@ -63,6 +62,7 @@ SELECT database, name, + engine, create_table_query FROM system.tables WHERE ({db_condition}) @@ -72,23 +72,6 @@ """ ) -GET_TABLE_SQL = strip_query( - """ - SELECT - database, - name, - engine, - engine_full, - create_table_query, - data_paths, - metadata_path, - uuid - FROM system.tables - WHERE database = '{db_name}' AND name = '{table_name}' - FORMAT JSON -""" -) - CHECK_TABLE_SQL = strip_query( """ SELECT count() @@ -575,19 +558,20 @@ def get_tables( return result - def get_table(self, db_name: str, table_name: str) -> Optional[Table]: + def get_table( + self, db_name: str, table_name: str, short_query: bool = False + ) -> Optional[Table]: """ Get table by name, returns None if no table has found. """ - query_sql = GET_TABLE_SQL.format( - db_name=escape(db_name), table_name=escape(table_name) - ) + tables = self.get_tables(db_name, [table_name], short_query) - result = self._ch_client.query(query_sql)["data"] - if result: - return self._make_table(result[0]) + if len(tables) > 1: + raise RuntimeError( + f"Found several tables, when expected to find single table: database {db_name}, table {table_name}" + ) - return None + return tables[0] if len(tables) == 1 else None def does_table_exist(self, db_name: str, table_name: str) -> bool: """ @@ -632,7 +616,7 @@ def restore_replica(self, table: Table) -> None: """ Call SYSTEM RESTORE REPLICA for table. """ - assert is_replicated(table.create_statement) + assert table.is_replicated() self._ch_client.query( RESTORE_REPLICA_SQL.format( db_name=escape(table.database), table_name=escape(table.name) @@ -923,7 +907,7 @@ def _make_table(self, record: dict) -> Table: engine=record.get("engine", None), disks=list(self._disks.values()), data_paths=( - record.get("data_paths", None) + record.get("data_paths", []) if "MergeTree" in record.get("engine", "") else [] ), diff --git a/ch_backup/clickhouse/models.py b/ch_backup/clickhouse/models.py index e56c737f..10679373 100644 --- a/ch_backup/clickhouse/models.py +++ b/ch_backup/clickhouse/models.py @@ -83,6 +83,64 @@ def _map_paths_to_disks( ) ) + def is_replicated(self) -> bool: + """ + Return True if table engine belongs to replicated merge tree table engine family, or False otherwise. + """ + return Table.engine_is_replicated(self.engine) + + @staticmethod + def engine_is_replicated(engine: str) -> bool: + """ + A static method for determining whether an engine is replicated or not. + """ + return "MergeTree" in engine and "Replicated" in engine + + def is_merge_tree(self) -> bool: + """ + Return True if table engine belongs to merge tree table engine family, or False otherwise. + """ + return self.engine.find("MergeTree") != -1 + + def is_view(self) -> bool: + """ + Return True if table engine is a view (either View or MaterializedView), or False otherwise. + """ + return self.engine in ("View", "LiveView", "MaterializedView") + + def is_distributed(self) -> bool: + """ + Return True if it's Distributed table engine, or False otherwise. + """ + return self.engine == "Distributed" + + def is_materialized_view(self) -> bool: + """ + Return True if it's MaterializedView table engine, or False otherwise. + """ + return self.engine == "MaterializedView" + + def is_external_engine(self) -> bool: + """ + Return True if the specified table engine is intended to use for integration with external systems. + """ + return self.engine in ( + "COSN", + "ExternalDistributed", + "HDFS", + "Hive", + "JDBC", + "Kafka", + "MeiliSearch", + "MongoDB", + "MySQL", + "ODBC", + "PostgreSQL", + "RabbitMQ", + "S3", + "URL", + ) + def __hash__(self): return hash((self.database, self.name)) diff --git a/ch_backup/clickhouse/schema.py b/ch_backup/clickhouse/schema.py index 019e476b..a02674f5 100644 --- a/ch_backup/clickhouse/schema.py +++ b/ch_backup/clickhouse/schema.py @@ -9,75 +9,6 @@ from ch_backup.util import escape -def is_merge_tree(engine: str) -> bool: - """ - Return True if table engine belongs to merge tree table engine family, or False otherwise. - """ - return engine.find("MergeTree") != -1 - - -def is_replicated(engine: str) -> bool: - """ - Return True if table engine belongs to replicated merge tree table engine family, or False otherwise. - """ - return engine.find("Replicated") != -1 - - -def is_distributed(engine: str) -> bool: - """ - Return True if it's Distributed table engine, or False otherwise. - """ - return engine == "Distributed" - - -def is_view(engine: str) -> bool: - """ - Return True if table engine is a view (either View or MaterializedView), or False otherwise. - """ - return engine in ("View", "LiveView", "MaterializedView") - - -def is_materialized_view(engine: str) -> bool: - """ - Return True if it's MaterializedView table engine, or False otherwise. - """ - return engine == "MaterializedView" - - -def is_external_engine(engine: str) -> bool: - """ - Return True if the specified table engine is intended to use for integration with external systems. - """ - return engine in ( - "COSN", - "ExternalDistributed", - "HDFS", - "Hive", - "JDBC", - "Kafka", - "MeiliSearch", - "MongoDB", - "MySQL", - "ODBC", - "PostgreSQL", - "RabbitMQ", - "S3", - "URL", - ) - - -def is_external_db_engine(db_engine: str) -> bool: - """ - Return True if the specified database engine is intended to use for integration with external systems. - """ - return db_engine in ( - "MySQL", - "MaterializedMySQL", - "PostgreSQL", - "MaterializedPostgreSQL", - ) - - def to_attach_query(create_query: str) -> str: """ Convert CREATE table query to ATTACH one. @@ -117,7 +48,7 @@ def rewrite_table_schema( ) create_statement = create_statement.replace("MergeTree()", "MergeTree") table.create_statement = create_statement - if is_replicated(table.engine): + if table.is_replicated(): table.engine = table.engine.replace("Replicated", "") if override_replica_name: @@ -182,8 +113,7 @@ def _add_uuid(table: Table, inner_uuid: str = None) -> None: f" Metadata UUID is {table.uuid}. return without adding UUID" ) return - - if is_view(table.engine): + if table.is_view(): inner_uuid_clause = f"TO INNER UUID '{inner_uuid}'" if inner_uuid else "" table.create_statement = re.sub( f"^(?PCREATE|ATTACH) (?P((MATERIALIZED|LIVE) )?VIEW) " diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 0e8c65ad..c57f1f12 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -20,11 +20,6 @@ from ch_backup.clickhouse.disks import ClickHouseTemporaryDisks from ch_backup.clickhouse.models import Database, FrozenPart, Table from ch_backup.clickhouse.schema import ( - is_distributed, - is_materialized_view, - is_merge_tree, - is_replicated, - is_view, rewrite_table_schema, to_attach_query, to_create_query, @@ -205,7 +200,7 @@ def _freeze_table( return None # Freeze only MergeTree tables - if not schema_only and is_merge_tree(table.engine): + if not schema_only and table.is_merge_tree(): try: context.ch_ctl.freeze_table(backup_name, table) except ClickhouseError: @@ -358,10 +353,9 @@ def restore( return failed_tables_names = [f"`{t.database}`.`{t.name}`" for t in failed_tables] - tables_to_restore_data = filter(lambda t: is_merge_tree(t.engine), tables_meta) tables_to_restore_data = filter( lambda t: f"`{t.database}`.`{t.name}`" not in failed_tables_names, - tables_to_restore_data, + tables_meta, ) use_inplace_cloud_restore = context.config_root["restore"][ @@ -465,7 +459,7 @@ def deduplicate_parts_in_batch( ) frozen_parts.clear() - if not is_merge_tree(table.engine): + if not table.is_merge_tree(): logging.info( 'Skipping table data backup for non MergeTree table "{}"."{}"', table.database, @@ -611,11 +605,11 @@ def _restore_tables( context, databases[table.database], table, add_uuid_if_required=True ) - if is_merge_tree(table.engine): + if table.is_merge_tree(): merge_tree_tables.append(table) - elif is_distributed(table.engine): + elif table.is_distributed(): distributed_tables.append(table) - elif is_view(table.engine): + elif table.is_view(): view_tables.append(table) else: other_tables.append(table) @@ -623,7 +617,7 @@ def _restore_tables( if clean_zookeeper and len(context.zk_config.get("hosts")) > 0: # type: ignore macros = context.ch_ctl.get_macros() replicated_tables = [ - table for table in merge_tree_tables if is_replicated(table.engine) + table for table in merge_tree_tables if table.is_replicated() ] logging.debug( @@ -654,20 +648,34 @@ def _restore_data( for table_meta in tables: cloud_storage_parts = [] try: + maybe_table_short = context.ch_ctl.get_table( + table_meta.database, table_meta.name, short_query=True + ) + if not maybe_table_short: + raise ClickhouseBackupError( + f"Table not found {table_meta.database}.{table_meta.name}" + ) + + # We have to check table engine on short Table version + # because some of columns might be inaccessbible, for old ch versions. + # Fix https://github.com/ClickHouse/ClickHouse/pull/55540 is pesented since 23.8. + if not maybe_table_short.is_merge_tree(): + logging.debug( + 'Skip table "{}.{}" data restore, because it is not MergeTree family.', + table_meta.database, + table_meta.name, + ) + continue + logging.debug( 'Running table "{}.{}" data restore', table_meta.database, table_meta.name, ) - maybe_table = context.ch_ctl.get_table( + table: Table = context.ch_ctl.get_table( table_meta.database, table_meta.name - ) - assert ( - maybe_table is not None - ), f"Table not found {table_meta.database}.{table_meta.name}" - table: Table = maybe_table - + ) # type: ignore attach_parts = [] for part in table_meta.get_parts(): if context.restore_context.part_restored(part): @@ -728,7 +736,7 @@ def _restore_data( for part in attach_parts: logging.debug( 'Attaching "{}.{}" part: {}', - table_meta.database, + table.database, table.name, part.name, ) @@ -738,7 +746,7 @@ def _restore_data( except Exception as e: logging.warning( 'Attaching "{}.{}" part {} failed: {}', - table_meta.database, + table.database, table.name, part.name, repr(e), @@ -763,9 +771,7 @@ def _rewrite_table_schema( if add_uuid_if_required and table.uuid and db.is_atomic(): add_uuid = True # Starting with 21.4 it's required to explicitly set inner table UUID for materialized views. - if is_materialized_view(table.engine) and context.ch_ctl.ch_version_ge( - "21.4" - ): + if table.is_materialized_view() and context.ch_ctl.ch_version_ge("21.4"): inner_table = context.ch_ctl.get_table( table.database, f".inner_id.{table.uuid}" ) @@ -845,22 +851,22 @@ def _restore_table_object( context: BackupContext, db: Database, table: Table ) -> None: try: - try: + if ( + table.is_merge_tree() + or table.is_view() + or table.is_external_engine() + or table.is_distributed() + ): logging.debug( f"Trying to restore table `{db.name}`.`{table.name}` by ATTACH method" ) table.create_statement = to_attach_query(table.create_statement) context.ch_ctl.create_table(table) - if ( - is_replicated(table.create_statement) - and not is_materialized_view(table.engine) - and context.ch_ctl.ch_version_ge("21.8") - ): - context.ch_ctl.restore_replica(table) - except Exception as e: - logging.warning( - f"Failed to restore table `{db.name}`.`{table.name}` using ATTACH method, " - f"fallback to CREATE. Exception: {e}" + if table.is_replicated(): + return context.ch_ctl.restore_replica(table) + else: + logging.debug( + f"Trying to restore table `{db.name}`.`{table.name}` by CREATE method" ) table.create_statement = to_create_query(table.create_statement) context.ch_ctl.create_table(table) diff --git a/tests/integration/features/ddl_dictionary.feature b/tests/integration/features/ddl_dictionary.feature index bf426e12..f0122a9d 100644 --- a/tests/integration/features/ddl_dictionary.feature +++ b/tests/integration/features/ddl_dictionary.feature @@ -72,14 +72,14 @@ Feature: Backup dictionary created by ddl """ CREATE DATABASE test_db Engine=Ordinary; - CREATE TABLE test_db.table_01 (n1 UInt32, n2 UInt32) + CREATE TABLE test_db.table_01 (n1 UInt64, n2 UInt32) ENGINE MergeTree PARTITION BY tuple() ORDER BY n1; - CREATE DICTIONARY test_db.test_dictionary (n1 UInt32, n2 UInt32) + CREATE DICTIONARY test_db.test_dictionary (n1 UInt64, n2 UInt32) PRIMARY KEY n1 LAYOUT(hashed()) LIFETIME(MIN 1 MAX 10) SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 DB 'test_db' TABLE 'table_01' USER 'default')); - ATTACH TABLE test_db.dict_table (`n1` UInt32, `n2` UInt32) ENGINE = Dictionary('test_db.test_dictionary'); + ATTACH TABLE test_db.dict_table (`n1` UInt64, `n2` UInt32) ENGINE = Dictionary('test_db.test_dictionary'); """ When we create clickhouse01 clickhouse backup Then we got the following backups on clickhouse01 diff --git a/tests/unit/test_schema.py b/tests/unit/test_schema.py index 459f25ef..6ff7940e 100644 --- a/tests/unit/test_schema.py +++ b/tests/unit/test_schema.py @@ -5,7 +5,7 @@ from tests.unit.utils import parametrize from ch_backup.clickhouse.models import Table -from ch_backup.clickhouse.schema import is_merge_tree, is_view, rewrite_table_schema +from ch_backup.clickhouse.schema import rewrite_table_schema UUID = "223b4576-76f0-4ed3-976f-46db82af82a9" INNER_UUID = "fa8ff291-1922-4b7f-afa7-06633d5e16ae" @@ -42,7 +42,8 @@ }, ) def test_is_merge_tree(engine, result): - assert is_merge_tree(engine) == result + table = Table("test", "test", engine, [], [], "", "", None) + assert table.is_merge_tree() == result @parametrize( @@ -76,7 +77,8 @@ def test_is_merge_tree(engine, result): }, ) def test_is_view(engine, result): - assert is_view(engine) == result + table = Table("test", "test", engine, [], [], "", "", None) + assert table.is_view() == result @parametrize(