Skip to content

Commit

Permalink
Set fixed restore method for table engines and refactor (#171)
Browse files Browse the repository at this point in the history
* Set fixed restore method for table engines and refactor

* fix

* Test fix

* Test fix

* fix

* Fix for 23.3

* Review fix
  • Loading branch information
MikhailBurdukov authored Jul 16, 2024
1 parent 248e0ce commit 73f7dd8
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 145 deletions.
5 changes: 2 additions & 3 deletions ch_backup/backup/deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
from ch_backup.backup.layout import BackupLayout
from ch_backup.backup.metadata import BackupMetadata, BackupState, PartMetadata
from ch_backup.backup_context import BackupContext
from ch_backup.clickhouse.models import Database, FrozenPart
from ch_backup.clickhouse.schema import is_replicated
from ch_backup.clickhouse.models import Database, FrozenPart, Table
from ch_backup.util import Slotted, utcnow


Expand Down Expand Up @@ -172,7 +171,7 @@ def _populate_dedup_info(
for db in databases_to_iterate:
db_dedup_info = dedup_info[db.name]
for table in backup.get_tables(db.name):
replicated = is_replicated(table.engine)
replicated = Table.engine_is_replicated(table.engine)
if replicated and db.replicated_tables_handled:
continue
if not replicated and (
Expand Down
40 changes: 12 additions & 28 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from ch_backup.calculators import calc_aligned_files_size
from ch_backup.clickhouse.client import ClickhouseClient
from ch_backup.clickhouse.models import Database, Disk, FrozenPart, Table
from ch_backup.clickhouse.schema import is_replicated
from ch_backup.exceptions import ClickhouseBackupError
from ch_backup.util import (
chown_dir_contents,
Expand Down Expand Up @@ -63,6 +62,7 @@
SELECT
database,
name,
engine,
create_table_query
FROM system.tables
WHERE ({db_condition})
Expand All @@ -72,23 +72,6 @@
"""
)

GET_TABLE_SQL = strip_query(
"""
SELECT
database,
name,
engine,
engine_full,
create_table_query,
data_paths,
metadata_path,
uuid
FROM system.tables
WHERE database = '{db_name}' AND name = '{table_name}'
FORMAT JSON
"""
)

CHECK_TABLE_SQL = strip_query(
"""
SELECT count()
Expand Down Expand Up @@ -575,19 +558,20 @@ def get_tables(

return result

def get_table(self, db_name: str, table_name: str) -> Optional[Table]:
def get_table(
self, db_name: str, table_name: str, short_query: bool = False
) -> Optional[Table]:
"""
Get table by name, returns None if no table has found.
"""
query_sql = GET_TABLE_SQL.format(
db_name=escape(db_name), table_name=escape(table_name)
)
tables = self.get_tables(db_name, [table_name], short_query)

result = self._ch_client.query(query_sql)["data"]
if result:
return self._make_table(result[0])
if len(tables) > 1:
raise RuntimeError(
f"Found several tables, when expected to find single table: database {db_name}, table {table_name}"
)

return None
return tables[0] if len(tables) == 1 else None

def does_table_exist(self, db_name: str, table_name: str) -> bool:
"""
Expand Down Expand Up @@ -632,7 +616,7 @@ def restore_replica(self, table: Table) -> None:
"""
Call SYSTEM RESTORE REPLICA for table.
"""
assert is_replicated(table.create_statement)
assert table.is_replicated()
self._ch_client.query(
RESTORE_REPLICA_SQL.format(
db_name=escape(table.database), table_name=escape(table.name)
Expand Down Expand Up @@ -923,7 +907,7 @@ def _make_table(self, record: dict) -> Table:
engine=record.get("engine", None),
disks=list(self._disks.values()),
data_paths=(
record.get("data_paths", None)
record.get("data_paths", [])
if "MergeTree" in record.get("engine", "")
else []
),
Expand Down
58 changes: 58 additions & 0 deletions ch_backup/clickhouse/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,64 @@ def _map_paths_to_disks(
)
)

def is_replicated(self) -> bool:
"""
Return True if table engine belongs to replicated merge tree table engine family, or False otherwise.
"""
return Table.engine_is_replicated(self.engine)

@staticmethod
def engine_is_replicated(engine: str) -> bool:
"""
A static method for determining whether an engine is replicated or not.
"""
return "MergeTree" in engine and "Replicated" in engine

def is_merge_tree(self) -> bool:
"""
Return True if table engine belongs to merge tree table engine family, or False otherwise.
"""
return self.engine.find("MergeTree") != -1

def is_view(self) -> bool:
"""
Return True if table engine is a view (either View or MaterializedView), or False otherwise.
"""
return self.engine in ("View", "LiveView", "MaterializedView")

def is_distributed(self) -> bool:
"""
Return True if it's Distributed table engine, or False otherwise.
"""
return self.engine == "Distributed"

def is_materialized_view(self) -> bool:
"""
Return True if it's MaterializedView table engine, or False otherwise.
"""
return self.engine == "MaterializedView"

def is_external_engine(self) -> bool:
"""
Return True if the specified table engine is intended to use for integration with external systems.
"""
return self.engine in (
"COSN",
"ExternalDistributed",
"HDFS",
"Hive",
"JDBC",
"Kafka",
"MeiliSearch",
"MongoDB",
"MySQL",
"ODBC",
"PostgreSQL",
"RabbitMQ",
"S3",
"URL",
)

def __hash__(self):
return hash((self.database, self.name))

Expand Down
74 changes: 2 additions & 72 deletions ch_backup/clickhouse/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,75 +9,6 @@
from ch_backup.util import escape


def is_merge_tree(engine: str) -> bool:
"""
Return True if table engine belongs to merge tree table engine family, or False otherwise.
"""
return engine.find("MergeTree") != -1


def is_replicated(engine: str) -> bool:
"""
Return True if table engine belongs to replicated merge tree table engine family, or False otherwise.
"""
return engine.find("Replicated") != -1


def is_distributed(engine: str) -> bool:
"""
Return True if it's Distributed table engine, or False otherwise.
"""
return engine == "Distributed"


def is_view(engine: str) -> bool:
"""
Return True if table engine is a view (either View or MaterializedView), or False otherwise.
"""
return engine in ("View", "LiveView", "MaterializedView")


def is_materialized_view(engine: str) -> bool:
"""
Return True if it's MaterializedView table engine, or False otherwise.
"""
return engine == "MaterializedView"


def is_external_engine(engine: str) -> bool:
"""
Return True if the specified table engine is intended to use for integration with external systems.
"""
return engine in (
"COSN",
"ExternalDistributed",
"HDFS",
"Hive",
"JDBC",
"Kafka",
"MeiliSearch",
"MongoDB",
"MySQL",
"ODBC",
"PostgreSQL",
"RabbitMQ",
"S3",
"URL",
)


def is_external_db_engine(db_engine: str) -> bool:
"""
Return True if the specified database engine is intended to use for integration with external systems.
"""
return db_engine in (
"MySQL",
"MaterializedMySQL",
"PostgreSQL",
"MaterializedPostgreSQL",
)


def to_attach_query(create_query: str) -> str:
"""
Convert CREATE table query to ATTACH one.
Expand Down Expand Up @@ -117,7 +48,7 @@ def rewrite_table_schema(
)
create_statement = create_statement.replace("MergeTree()", "MergeTree")
table.create_statement = create_statement
if is_replicated(table.engine):
if table.is_replicated():
table.engine = table.engine.replace("Replicated", "")

if override_replica_name:
Expand Down Expand Up @@ -182,8 +113,7 @@ def _add_uuid(table: Table, inner_uuid: str = None) -> None:
f" Metadata UUID is {table.uuid}. return without adding UUID"
)
return

if is_view(table.engine):
if table.is_view():
inner_uuid_clause = f"TO INNER UUID '{inner_uuid}'" if inner_uuid else ""
table.create_statement = re.sub(
f"^(?P<statement>CREATE|ATTACH) (?P<view_type>((MATERIALIZED|LIVE) )?VIEW) "
Expand Down
Loading

0 comments on commit 73f7dd8

Please sign in to comment.