From ebccb1469f2289c9303eca397f0b660d50f93269 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov <102754618+MikhailBurdukov@users.noreply.github.com> Date: Tue, 19 Sep 2023 12:59:45 +0300 Subject: [PATCH] Restore retry, when the node have a table based on table function. (#70) * Initial ver * Test and style fixes * Fix test * Fix query * Comments --- ch_backup/clickhouse/control.py | 33 +++++++++++++--- ch_backup/logic/table.py | 2 +- .../features/backup_restore.feature | 38 +++++++++++++++++++ tests/integration/steps/s3.py | 19 +++++++++- 4 files changed, 84 insertions(+), 8 deletions(-) diff --git a/ch_backup/clickhouse/control.py b/ch_backup/clickhouse/control.py index 394d1d0e..57f8b756 100644 --- a/ch_backup/clickhouse/control.py +++ b/ch_backup/clickhouse/control.py @@ -55,6 +55,20 @@ """ ) +GET_TABLES_SHORT_SQL = strip_query( + """ + SELECT + database, + name, + create_table_query + FROM system.tables + WHERE (empty('{db_name}') OR database = '{db_name}') + AND (empty({table_names}) OR has(cast({table_names}, 'Array(String)'), name)) + ORDER BY metadata_modification_time + FORMAT JSON +""" +) + CHECK_TABLE_SQL = strip_query( """ SELECT countIf(database = '{db_name}' AND name = '{table_name}') @@ -431,12 +445,19 @@ def get_database_engine(self, db_name: str) -> str: return self._ch_client.query(query_sql) def get_tables( - self, db_name: str = None, tables: Optional[Sequence[str]] = None + self, + db_name: str = None, + tables: Optional[Sequence[str]] = None, + short_query: bool = False, ) -> Sequence[Table]: """ Get database tables. + + A short query does not access the source of table if it was built from an external source. + Example: CREATE ... AS postgresql() or CREATE ... AS s3(). """ - query_sql = GET_TABLES_SQL.format( + base_query_sql = GET_TABLES_SHORT_SQL if short_query else GET_TABLES_SQL + query_sql = base_query_sql.format( db_name=escape(db_name) if db_name is not None else "", table_names=list(map(escape, tables)) if tables is not None else [], ) # type: ignore @@ -727,12 +748,12 @@ def _make_table(self, record: dict) -> Table: return Table( database=record["database"], name=record["name"], - engine=record["engine"], + engine=record.get("engine", None), disks=list(self._disks.values()), - data_paths=record["data_paths"] - if record["engine"].find("MergeTree") != -1 + data_paths=record.get("data_paths", None) + if "MergeTree" in record.get("engine", "") else [], - metadata_path=record["metadata_path"], + metadata_path=record.get("metadata_path", None), create_statement=record["create_table_query"], uuid=record.get("uuid", None), ) diff --git a/ch_backup/logic/table.py b/ch_backup/logic/table.py index 43b6f7d2..c0b93849 100644 --- a/ch_backup/logic/table.py +++ b/ch_backup/logic/table.py @@ -458,7 +458,7 @@ def _preprocess_tables_to_restore( # Filter out already restored tables. existing_tables = {} - for table in context.ch_ctl.get_tables(): + for table in context.ch_ctl.get_tables(short_query=True): existing_tables[(table.database, table.name)] = table result: List[Table] = [] diff --git a/tests/integration/features/backup_restore.feature b/tests/integration/features/backup_restore.feature index ff73dfc5..06627165 100644 --- a/tests/integration/features/backup_restore.feature +++ b/tests/integration/features/backup_restore.feature @@ -185,3 +185,41 @@ Feature: Backup & Restore """ And we restore clickhouse backup #0 to clickhouse02 Then clickhouse02 has same schema as clickhouse01 + + Scenario: Perform retry restore when exist the table based on the table function. + Given we execute query on clickhouse01 + """ + CREATE DATABASE test_s3 + """ + When we put object in S3 + """ + bucket: cloud-storage-01 + path: /data.tsv + data: '1' + """ + + When we execute query on clickhouse01 + """ + CREATE TABLE test_s3.s3_test_table (v Int) AS + s3('{{conf['s3']['endpoint']}}/cloud-storage-01/data.tsv', '{{conf['s3']['access_key_id']}}', '{{conf['s3']['access_secret_key']}}', 'TSV'); + """ + + And we create clickhouse01 clickhouse backup + + And we delete object in S3 + """ + bucket: cloud-storage-01 + path: /data.tsv + """ + # If we restore the data, the table s3_test_table will be created through StorageProxy + # and not all the rows from system.table will be accessible. Check that we can perform the restore operation after restore. + And we restore clickhouse backup #0 to clickhouse01 + And we restore clickhouse backup #0 to clickhouse01 + And we execute query on clickhouse01 + """ + SELECT count(*) FROM system.tables WHERE name='s3_test_table'; + """ + Then we get response + """ + 1 + """ diff --git a/tests/integration/steps/s3.py b/tests/integration/steps/s3.py index 6f5d400b..165627f9 100644 --- a/tests/integration/steps/s3.py +++ b/tests/integration/steps/s3.py @@ -2,10 +2,11 @@ Steps for interacting with S3. """ -from behave import given, then +from behave import given, then, when from hamcrest import assert_that, equal_to from tests.integration.modules import s3 from tests.integration.modules.minio import configure_s3_credentials, create_s3_buckets +from tests.integration.modules.steps import get_step_data @given("a working s3") @@ -38,3 +39,19 @@ def step_cloud_storage_bucket_contains_files(context, bucket, count): equal_to(count), f"Objects count = {len(objects)}, expected {count}, objects {objects}", ) + + +@when("we put object in S3") +def step_create_file_in_s3(context): + conf = get_step_data(context) + s3_client = s3.S3Client(context, conf["bucket"]) + s3_client.upload_data(conf["data"], conf["path"]) + assert s3_client.path_exists(conf["path"]) + + +@when("we delete object in S3") +def stop_delete_file_in_S3(context): + conf = get_step_data(context) + s3_client = s3.S3Client(context, conf["bucket"]) + s3_client.delete_data(conf["path"]) + assert not s3_client.path_exists(conf["path"])