Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set fixed restore method for table engines and refactor #171

Merged
merged 8 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions ch_backup/backup/deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
from ch_backup.backup.layout import BackupLayout
from ch_backup.backup.metadata import BackupMetadata, BackupState, PartMetadata
from ch_backup.backup_context import BackupContext
from ch_backup.clickhouse.models import Database, FrozenPart
from ch_backup.clickhouse.schema import is_replicated
from ch_backup.clickhouse.models import Database, FrozenPart, Table
from ch_backup.util import Slotted, utcnow


Expand Down Expand Up @@ -172,7 +171,7 @@ def _populate_dedup_info(
for db in databases_to_iterate:
db_dedup_info = dedup_info[db.name]
for table in backup.get_tables(db.name):
replicated = is_replicated(table.engine)
replicated = Table.engine_is_replicated(table.engine)
if replicated and db.replicated_tables_handled:
continue
if not replicated and (
Expand Down
39 changes: 9 additions & 30 deletions ch_backup/clickhouse/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from ch_backup.calculators import calc_aligned_files_size
from ch_backup.clickhouse.client import ClickhouseClient
from ch_backup.clickhouse.models import Database, Disk, FrozenPart, Table
from ch_backup.clickhouse.schema import is_replicated
from ch_backup.exceptions import ClickhouseBackupError
from ch_backup.util import (
chown_dir_contents,
Expand Down Expand Up @@ -63,6 +62,8 @@
SELECT
database,
name,
create_table_query,
engine,
create_table_query
kirillgarbar marked this conversation as resolved.
Show resolved Hide resolved
FROM system.tables
WHERE ({db_condition})
Expand All @@ -72,23 +73,6 @@
"""
)

GET_TABLE_SQL = strip_query(
"""
SELECT
database,
name,
engine,
engine_full,
create_table_query,
data_paths,
metadata_path,
uuid
FROM system.tables
WHERE database = '{db_name}' AND name = '{table_name}'
FORMAT JSON
"""
)

CHECK_TABLE_SQL = strip_query(
"""
SELECT count()
Expand Down Expand Up @@ -575,19 +559,14 @@ def get_tables(

return result

def get_table(self, db_name: str, table_name: str) -> Optional[Table]:
def get_table(
self, db_name: str, table_name: str, short_query: bool = False
) -> Optional[Table]:
"""
Get table by name, returns None if no table has found.
"""
query_sql = GET_TABLE_SQL.format(
db_name=escape(db_name), table_name=escape(table_name)
)

result = self._ch_client.query(query_sql)["data"]
if result:
return self._make_table(result[0])

return None
tables = self.get_tables(db_name, [table_name], short_query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add an exception if it is returned > 1

return tables[0] if len(tables) == 1 else None

def does_table_exist(self, db_name: str, table_name: str) -> bool:
"""
Expand Down Expand Up @@ -632,7 +611,7 @@ def restore_replica(self, table: Table) -> None:
"""
Call SYSTEM RESTORE REPLICA for table.
"""
assert is_replicated(table.create_statement)
assert table.is_replicated()
self._ch_client.query(
RESTORE_REPLICA_SQL.format(
db_name=escape(table.database), table_name=escape(table.name)
Expand Down Expand Up @@ -923,7 +902,7 @@ def _make_table(self, record: dict) -> Table:
engine=record.get("engine", None),
disks=list(self._disks.values()),
data_paths=(
record.get("data_paths", None)
record.get("data_paths", [])
if "MergeTree" in record.get("engine", "")
else []
),
Expand Down
58 changes: 58 additions & 0 deletions ch_backup/clickhouse/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,64 @@ def _map_paths_to_disks(
)
)

def is_replicated(self) -> bool:
"""
Return True if table engine belongs to replicated merge tree table engine family, or False otherwise.
"""
return self.is_merge_tree() and self.engine.find("Replicated") != -1

@staticmethod
def engine_is_replicated(engine: str) -> bool:
"""
A static method for determining whether an engine is replicated or not.
"""
return Table("", "", engine, [], [], "", "", None).is_replicated()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must be adjusted on every change of Table constructor.
Why not just "MergeTree" in engine and "Replicated" in engine and reuse it in is_replicated()


def is_merge_tree(self) -> bool:
"""
Return True if table engine belongs to merge tree table engine family, or False otherwise.
"""
return self.engine.find("MergeTree") != -1

def is_view(self) -> bool:
"""
Return True if table engine is a view (either View or MaterializedView), or False otherwise.
"""
return self.engine in ("View", "LiveView", "MaterializedView")

def is_distributed(self) -> bool:
"""
Return True if it's Distributed table engine, or False otherwise.
"""
return self.engine == "Distributed"

def is_materialized_view(self) -> bool:
"""
Return True if it's MaterializedView table engine, or False otherwise.
"""
return self.engine == "MaterializedView"

def is_external_engine(self) -> bool:
"""
Return True if the specified table engine is intended to use for integration with external systems.
"""
return self.engine in (
"COSN",
"ExternalDistributed",
"HDFS",
"Hive",
"JDBC",
"Kafka",
"MeiliSearch",
"MongoDB",
"MySQL",
"ODBC",
"PostgreSQL",
"RabbitMQ",
"S3",
"URL",
)

def __hash__(self):
return hash((self.database, self.name))

Expand Down
74 changes: 2 additions & 72 deletions ch_backup/clickhouse/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,75 +9,6 @@
from ch_backup.util import escape


def is_merge_tree(engine: str) -> bool:
"""
Return True if table engine belongs to merge tree table engine family, or False otherwise.
"""
return engine.find("MergeTree") != -1


def is_replicated(engine: str) -> bool:
"""
Return True if table engine belongs to replicated merge tree table engine family, or False otherwise.
"""
return engine.find("Replicated") != -1


def is_distributed(engine: str) -> bool:
"""
Return True if it's Distributed table engine, or False otherwise.
"""
return engine == "Distributed"


def is_view(engine: str) -> bool:
"""
Return True if table engine is a view (either View or MaterializedView), or False otherwise.
"""
return engine in ("View", "LiveView", "MaterializedView")


def is_materialized_view(engine: str) -> bool:
"""
Return True if it's MaterializedView table engine, or False otherwise.
"""
return engine == "MaterializedView"


def is_external_engine(engine: str) -> bool:
"""
Return True if the specified table engine is intended to use for integration with external systems.
"""
return engine in (
"COSN",
"ExternalDistributed",
"HDFS",
"Hive",
"JDBC",
"Kafka",
"MeiliSearch",
"MongoDB",
"MySQL",
"ODBC",
"PostgreSQL",
"RabbitMQ",
"S3",
"URL",
)


def is_external_db_engine(db_engine: str) -> bool:
"""
Return True if the specified database engine is intended to use for integration with external systems.
"""
return db_engine in (
"MySQL",
"MaterializedMySQL",
"PostgreSQL",
"MaterializedPostgreSQL",
)


def to_attach_query(create_query: str) -> str:
"""
Convert CREATE table query to ATTACH one.
Expand Down Expand Up @@ -117,7 +48,7 @@ def rewrite_table_schema(
)
create_statement = create_statement.replace("MergeTree()", "MergeTree")
table.create_statement = create_statement
if is_replicated(table.engine):
if table.is_replicated():
table.engine = table.engine.replace("Replicated", "")

if override_replica_name:
Expand Down Expand Up @@ -182,8 +113,7 @@ def _add_uuid(table: Table, inner_uuid: str = None) -> None:
f" Metadata UUID is {table.uuid}. return without adding UUID"
)
return

if is_view(table.engine):
if table.is_view():
inner_uuid_clause = f"TO INNER UUID '{inner_uuid}'" if inner_uuid else ""
table.create_statement = re.sub(
f"^(?P<statement>CREATE|ATTACH) (?P<view_type>((MATERIALIZED|LIVE) )?VIEW) "
Expand Down
Loading
Loading