diff --git a/docs/web/docs/guides/how_to_use/merge_dbs/custom_conflict_resolver.md b/docs/web/docs/guides/how_to_use/merge_dbs/custom_conflict_resolver.md index 77fd228d0..01e077422 100644 --- a/docs/web/docs/guides/how_to_use/merge_dbs/custom_conflict_resolver.md +++ b/docs/web/docs/guides/how_to_use/merge_dbs/custom_conflict_resolver.md @@ -36,3 +36,7 @@ and perhaps `default_strategy_name` value (see `DefaultMergeConflictResolver` as NOTE: All available providers must be present in `strategies_config`. Table names (under each provider key) are optional, and if missing, `default_strategy_name` will be used for all conflicts related to this table. + +4. There is an example of a working custom conflict resolver in module `mephisto/tools/db_data_porter/conflict_resolvers/example_merge_conflict_resolver.py`. You can launch it like this: + +`mephisto db import ... --conflict-resolver ExampleMergeConflictResolver` diff --git a/docs/web/docs/guides/how_to_use/merge_dbs/reference.md b/docs/web/docs/guides/how_to_use/merge_dbs/reference.md index 5cf58becb..55677f590 100644 --- a/docs/web/docs/guides/how_to_use/merge_dbs/reference.md +++ b/docs/web/docs/guides/how_to_use/merge_dbs/reference.md @@ -14,7 +14,7 @@ This is a reference describing set of commands under the `mephisto db` command g ## Export This command exports data from Mephisto DB and provider-specific datastores -as a combination of (i) a JSON file, and (ii) an archived `data` catalog with related files. +as an archived combination of (i) a JSON file, and (ii) a `data` catalog with related files. If no parameter passed, full data dump (i.e. backup) will be created. @@ -29,25 +29,25 @@ mephisto db export --export-tasks-by-ids 1 --export-tasks-by-ids 2 mephisto db export --export-task-runs-by-ids 3 --export-task-runs-by-ids 4 mephisto db export --export-task-runs-since-date 2024-01-01 mephisto db export --export-task-runs-since-date 2023-01-01T00:00:00 -mephisto db export --export-labels first_dump --export-labels second_dump -mephisto db export --export-tasks-by-ids 1 --delete-exported-data --randomize-legacy-ids --export-indent 2 +mephisto db export --labels first_dump --labels second_dump +mephisto db export --export-tasks-by-ids 1 --delete-exported-data --randomize-legacy-ids --export-indent 4 ``` Options (all optional): - `-tn/--export-tasks-by-names` - names of Tasks that will be exported - `-ti/--export-tasks-by-ids` - ids of Tasks that will be exported -- `-tr/--export-task-runs-by-ids` - ids of TaskRuns that will be exported +- `-tri/--export-task-runs-by-ids` - ids of TaskRuns that will be exported - `-trs/--export-task-runs-since-date` - only objects created after this ISO8601 datetime will be exported -- `-tl/--export-labels` - only data imported under these labels will be exported -- `-de/--delete-exported-data` - after exporting data, delete it from local DB +- `-l/--labels` - only data imported under these labels will be exported +- `-del/--delete-exported-data` - after exporting data, delete it from local DB - `-r/--randomize-legacy-ids` - replace legacy autoincremented ids with new pseudo-random ids to avoid conflicts during data merging -- `-i/--export-indent` - make dump easy to read via formatting JSON with indentations +- `-i/--export-indent` - make dump easy to read via formatting JSON with indentations (Default 2) - `-v/--verbosity` - write more informative messages about progress (Default 0. Values: 0, 1) Note that the following options cannot be used together: -`--export-tasks-by-names`, `--export-tasks-by-ids`, `--export-task-runs-by-ids`, `--export-task-runs-since-date`, `--export-labels`. +`--export-tasks-by-names`, `--export-tasks-by-ids`, `--export-task-runs-by-ids`, `--export-task-runs-since-date`, `--labels`. ## Import @@ -56,21 +56,21 @@ This command imports data from a dump file created by `mephisto db export` comma Examples: ``` -mephisto db import --dump-file +mephisto db import --file -mephisto db import --dump-file 2024_01_01_00_00_01_mephisto_dump.json --verbosity -mephisto db import --dump-file 2024_01_01_00_00_01_mephisto_dump.json --label-name my_first_dump -mephisto db import --dump-file 2024_01_01_00_00_01_mephisto_dump.json --conflict-resolver MyCustomMergeConflictResolver -mephisto db import --dump-file 2024_01_01_00_00_01_mephisto_dump.json --keep-import-metadata +mephisto db import --file 2024_01_01_00_00_01_mephisto_dump.json --verbosity +mephisto db import --file 2024_01_01_00_00_01_mephisto_dump.json --labels my_first_dump +mephisto db import --file 2024_01_01_00_00_01_mephisto_dump.json --conflict-resolver MyCustomMergeConflictResolver +mephisto db import --file 2024_01_01_00_00_01_mephisto_dump.json --keep-import-metadata ``` Options: -- `-d/--dump-file` - location of the __***.json__ dump file (filename if created in +- `-f/--file` - location of the `***.zip` dump file (filename if created in `/outputs/export` folder, or absolute filepath) - `-cr/--conflict-resolver` (Optional) - name of Python class to be used for resolving merging conflicts (when your local DB already has a row with same unique field value as a DB row in the dump data) -- `-l/--label-name` - a short string serving as a reference for the ported data (stored in `imported_data` table), - so later you can export the imported data with `--export-labels` export option +- `-l/--labels` - one or more short strings serving as a reference for the ported data (stored in `imported_data` table), + so later you can export the imported data with `--labels` export option - `-k/--keep-import-metadata` - write data from `imported_data` table of the dump (by default it's not imported) - `-v/--verbosity` - level of logging (default: 0; values: 0, 1) @@ -95,13 +95,13 @@ Note that it will erase all current data, and you may want to run command `mephi Examples: ``` -mephisto db restore --backup-file +mephisto db restore --file -mephisto db restore --backup-file 2024_01_01_00_10_01.zip +mephisto db restore --file 2024_01_01_00_10_01.zip ``` Options: -- `-b/--backup-file` - location of the __*.zip__ backup file (filename if created in +- `-f/--file` - location of the `***.zip` backup file (filename if created in `/outputs/backup` folder, or absolute filepath) - `-v/--verbosity` - level of logging (default: 0; values: 0, 1) diff --git a/docs/web/docs/guides/how_to_use/merge_dbs/simple_usage.md b/docs/web/docs/guides/how_to_use/merge_dbs/simple_usage.md index 3ebf9020a..1c2089dc6 100644 --- a/docs/web/docs/guides/how_to_use/merge_dbs/simple_usage.md +++ b/docs/web/docs/guides/how_to_use/merge_dbs/simple_usage.md @@ -45,8 +45,8 @@ mephisto db backup And you will see text like this ``` -Started making backup -Finished successfully! File: '//outputs/backup/2024_01_01_00_00_01_mephisto_backup.zip +Started creating backup file ... +Finished successfully! File: //outputs/backup/2024_01_01_00_00_01_mephisto_backup.zip ``` Find and copy this file. @@ -79,31 +79,30 @@ mephisto db export --randomize-legacy-ids And you will see text like this ``` -Started exporting -Run command for all TaskRuns. +Started exporting data ... +No filter for TaskRun specified - exporting all TaskRuns. Finished successfully! Files created: - - Database dump - //outputs/export/2024_01_01_00_00_01_mephisto_dump.json - - Data files dump - //outputs/export/2024_01_01_00_00_01_mephisto_dump.zip + - Dump archive - //outputs/export/2024_01_01_00_00_01_mephisto_dump.zip ``` ### Import just created dump into main project -Put your dump into export directory `/mephisto/outputs/export/` and you can use just a dump name in the command, +Put your dump into export directory `//outputs/export/` and you can use just a dump name in the command, or use a full path to the file. Let's just imagine, you put file in export directory: ```shell -mephisto db import --dump-file 2024_01_01_00_00_01_mephisto_dump.json +mephisto db import --file 2024_01_01_00_00_01_mephisto_dump.zip ``` And you will see text like this ``` -Started importing from dump '2024_01_01_00_00_01_mephisto_dump.json' Are you sure? It will affect your databases and related files. Type 'yes' and press Enter if you want to proceed: yes Just in case, we are making a backup of all your local data. If something went wrong during import, we will restore all your data from this backup -Backup was created successfully! File: '/mephisto/outputs/backup/2024_01_01_00_10_01_mephisto_backup.zip' +Backup was created successfully! File: '//outputs/backup/2024_04_25_17_11_56_mephisto_backup.zip' +Started importing from dump file //outputs/export/2024_04_25_17_11_43_mephisto_dump.zip ... Finished successfully ``` @@ -117,14 +116,14 @@ Also, we create a backup automatically just in case too, just before all changes No worries, just restore everything from your or our backup: ```shell -mephisto db restore --backup-file 2024_01_01_00_10_01.zip +mephisto db restore --file 2024_01_01_00_10_01_mephisto_backup.zip ``` And you will see text like this ``` -Started restoring from backup '2024_01_01_00_10_01.zip' Are you sure? It will affect your databases and related files. Type 'yes' and press Enter if you want to proceed: yes +Started restoring from backup //outputs/backup/2024_01_01_00_10_01_mephisto_backup.zip ... Finished successfully ``` diff --git a/mephisto/abstractions/databases/local_database.py b/mephisto/abstractions/databases/local_database.py index c6b0ef676..bfca8f822 100644 --- a/mephisto/abstractions/databases/local_database.py +++ b/mephisto/abstractions/databases/local_database.py @@ -224,7 +224,7 @@ def _new_project(self, project_name: str) -> str: ( make_randomized_int_id(), project_name, - ) + ), ) project_id = str(c.lastrowid) return project_id @@ -264,7 +264,8 @@ def _find_projects(self, project_name: Optional[str] = None) -> List[Project]: """ SELECT * from projects """ - + additional_query, + + additional_query + + " ORDER BY creation_date ASC", arg_tuple, ) rows = c.fetchall() @@ -311,7 +312,10 @@ def _new_task( raise EntryDoesNotExistException(e) elif is_unique_failure(e): raise EntryAlreadyExistsException( - e, db=self, table_name="tasks", original_exc=e, + e, + db=self, + table_name="tasks", + original_exc=e, ) raise MephistoDBException(e) @@ -344,7 +348,8 @@ def _find_tasks( """ SELECT * from tasks """ - + additional_query, + + additional_query + + " ORDER BY creation_date ASC", arg_tuple, ) rows = c.fetchall() @@ -449,7 +454,10 @@ def _new_task_run( raise EntryDoesNotExistException(e) elif is_unique_failure(e): raise EntryAlreadyExistsException( - e, db=self, table_name="task_runs", original_exc=e, + e, + db=self, + table_name="task_runs", + original_exc=e, ) raise MephistoDBException(e) @@ -483,7 +491,8 @@ def _find_task_runs( """ SELECT * from task_runs """ - + additional_query, + + additional_query + + " ORDER BY creation_date ASC", arg_tuple, ) rows = c.fetchall() @@ -552,7 +561,10 @@ def _new_assignment( except sqlite3.IntegrityError as e: if is_unique_failure(e): raise EntryAlreadyExistsException( - e, db=self, table_name="assignments", original_exc=e, + e, + db=self, + table_name="assignments", + original_exc=e, ) raise MephistoDBException(e) @@ -603,7 +615,8 @@ def _find_assignments( """ SELECT * from assignments """ - + additional_query, + + additional_query + + " ORDER BY creation_date ASC", arg_tuple, ) rows = c.fetchall() @@ -668,7 +681,10 @@ def _new_unit( raise EntryDoesNotExistException(e) elif is_unique_failure(e): raise EntryAlreadyExistsException( - e, db=self, table_name="units", original_exc=e, + e, + db=self, + table_name="units", + original_exc=e, ) raise MephistoDBException(e) @@ -734,7 +750,8 @@ def _find_units( """ SELECT * from units """ - + additional_query, + + additional_query + + " ORDER BY creation_date ASC", arg_tuple, ) rows = c.fetchall() @@ -817,7 +834,7 @@ def _new_requester(self, requester_name: str, provider_type: str) -> str: """ INSERT INTO requesters( requester_id, - requester_name, + requester_name, provider_type ) VALUES (?, ?, ?); """, @@ -832,7 +849,10 @@ def _new_requester(self, requester_name: str, provider_type: str) -> str: except sqlite3.IntegrityError as e: if is_unique_failure(e): raise EntryAlreadyExistsException( - e, db=self, table_name="requesters", original_exc=e, + e, + db=self, + table_name="requesters", + original_exc=e, ) raise MephistoDBException(e) @@ -862,7 +882,8 @@ def _find_requesters( """ SELECT * from requesters """ - + additional_query, + + additional_query + + " ORDER BY creation_date ASC", arg_tuple, ) rows = c.fetchall() @@ -890,7 +911,7 @@ def _new_worker(self, worker_name: str, provider_type: str) -> str: """ INSERT INTO workers( worker_id, - worker_name, + worker_name, provider_type ) VALUES (?, ?, ?); """, @@ -905,7 +926,10 @@ def _new_worker(self, worker_name: str, provider_type: str) -> str: except sqlite3.IntegrityError as e: if is_unique_failure(e): raise EntryAlreadyExistsException( - e, db=self, table_name="workers", original_exc=e, + e, + db=self, + table_name="workers", + original_exc=e, ) raise MephistoDBException(e) @@ -935,7 +959,8 @@ def _find_workers( """ SELECT * from workers """ - + additional_query, + + additional_query + + " ORDER BY creation_date ASC", arg_tuple, ) rows = c.fetchall() @@ -1006,7 +1031,10 @@ def _new_agent( raise EntryDoesNotExistException(e) elif is_unique_failure(e): raise EntryAlreadyExistsException( - e, db=self, table_name="agents", original_exc=e, + e, + db=self, + table_name="agents", + original_exc=e, ) raise MephistoDBException(e) @@ -1082,7 +1110,8 @@ def _find_agents( """ SELECT * from agents """ - + additional_query, + + additional_query + + " ORDER BY creation_date ASC", arg_tuple, ) rows = c.fetchall() @@ -1107,7 +1136,10 @@ def _make_qualification(self, qualification_name: str) -> str: except sqlite3.IntegrityError as e: if is_unique_failure(e): raise EntryAlreadyExistsException( - e, db=self, table_name="units", original_exc=e, + e, + db=self, + table_name="units", + original_exc=e, ) raise MephistoDBException(e) @@ -1125,7 +1157,8 @@ def _find_qualifications(self, qualification_name: Optional[str] = None) -> List """ SELECT * from qualifications """ - + additional_query, + + additional_query + + " ORDER BY creation_date ASC", arg_tuple, ) rows = c.fetchall() @@ -1204,7 +1237,10 @@ def _grant_qualification(self, qualification_id: str, worker_id: str, value: int except sqlite3.IntegrityError as e: if is_unique_failure(e): raise EntryAlreadyExistsException( - e, db=self, table_name="units", original_exc=e, + e, + db=self, + table_name="units", + original_exc=e, ) raise MephistoDBException(e) @@ -1319,7 +1355,10 @@ def _new_onboarding_agent( raise EntryDoesNotExistException(e) elif is_unique_failure(e): raise EntryAlreadyExistsException( - e, db=self, table_name="onboarding_agents", original_exc=e, + e, + db=self, + table_name="onboarding_agents", + original_exc=e, ) raise MephistoDBException(e) @@ -1388,7 +1427,8 @@ def _find_onboarding_agents( """ SELECT * from onboarding_agents """ - + additional_query, + + additional_query + + " ORDER BY creation_date ASC", arg_tuple, ) rows = c.fetchall() @@ -1439,7 +1479,10 @@ def _new_unit_review( except sqlite3.IntegrityError as e: if is_unique_failure(e): raise EntryAlreadyExistsException( - e, db=self, table_name="unit_review", original_exc=e, + e, + db=self, + table_name="unit_review", + original_exc=e, ) raise MephistoDBException(e) diff --git a/mephisto/abstractions/databases/migrations/_001_20240325_preparing_db_for_merge_dbs_command.py b/mephisto/abstractions/databases/migrations/_001_20240325_preparing_db_for_merge_dbs_command.py index 0f8094fe2..20061a12b 100644 --- a/mephisto/abstractions/databases/migrations/_001_20240325_preparing_db_for_merge_dbs_command.py +++ b/mephisto/abstractions/databases/migrations/_001_20240325_preparing_db_for_merge_dbs_command.py @@ -9,27 +9,28 @@ 2. Remove autoincrement parameter for all Primary Keys 3. Add missed Foreign Keys in `agents` table 4. Add `granted_qualifications.update_date` +5. Modified default value for `creation_date` """ PREPARING_DB_FOR_MERGE_DBS_COMMAND = """ ALTER TABLE unit_review RENAME COLUMN created_at TO creation_date; - + /* Disable FK constraints */ PRAGMA foreign_keys = off; - - + + /* Projects */ CREATE TABLE IF NOT EXISTS _projects ( project_id INTEGER PRIMARY KEY, project_name TEXT NOT NULL UNIQUE, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) ); INSERT INTO _projects SELECT * FROM projects; DROP TABLE projects; ALTER TABLE _projects RENAME TO projects; - - + + /* Tasks */ CREATE TABLE IF NOT EXISTS _tasks ( task_id INTEGER PRIMARY KEY, @@ -37,27 +38,27 @@ task_type TEXT NOT NULL, project_id INTEGER, parent_task_id INTEGER, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP, + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), FOREIGN KEY (parent_task_id) REFERENCES tasks (task_id), FOREIGN KEY (project_id) REFERENCES projects (project_id) ); INSERT INTO _tasks SELECT * FROM tasks; DROP TABLE tasks; ALTER TABLE _tasks RENAME TO tasks; - - + + /* Requesters */ CREATE TABLE IF NOT EXISTS _requesters ( requester_id INTEGER PRIMARY KEY, requester_name TEXT NOT NULL UNIQUE, provider_type TEXT NOT NULL, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) ); INSERT INTO _requesters SELECT * FROM requesters; DROP TABLE requesters; ALTER TABLE _requesters RENAME TO requesters; - - + + /* Task Runs */ CREATE TABLE IF NOT EXISTS _task_runs ( task_run_id INTEGER PRIMARY KEY, @@ -68,15 +69,15 @@ provider_type TEXT NOT NULL, task_type TEXT NOT NULL, sandbox BOOLEAN NOT NULL, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP, + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), FOREIGN KEY (task_id) REFERENCES tasks (task_id), FOREIGN KEY (requester_id) REFERENCES requesters (requester_id) ); INSERT INTO _task_runs SELECT * FROM task_runs; DROP TABLE task_runs; ALTER TABLE _task_runs RENAME TO task_runs; - - + + /* Assignments */ CREATE TABLE IF NOT EXISTS _assignments ( assignment_id INTEGER PRIMARY KEY, @@ -86,7 +87,7 @@ task_type TEXT NOT NULL, provider_type TEXT NOT NULL, sandbox BOOLEAN NOT NULL, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP, + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), FOREIGN KEY (task_id) REFERENCES tasks (task_id), FOREIGN KEY (task_run_id) REFERENCES task_runs (task_run_id), FOREIGN KEY (requester_id) REFERENCES requesters (requester_id) @@ -94,8 +95,8 @@ INSERT INTO _assignments SELECT * FROM assignments; DROP TABLE assignments; ALTER TABLE _assignments RENAME TO assignments; - - + + /* Units */ CREATE TABLE IF NOT EXISTS _units ( unit_id INTEGER PRIMARY KEY, @@ -111,7 +112,7 @@ task_run_id INTEGER NOT NULL, sandbox BOOLEAN NOT NULL, requester_id INTEGER NOT NULL, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP, + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), FOREIGN KEY (assignment_id) REFERENCES assignments (assignment_id), FOREIGN KEY (agent_id) REFERENCES agents (agent_id), FOREIGN KEY (task_run_id) REFERENCES task_runs (task_run_id), @@ -123,20 +124,20 @@ INSERT INTO _units SELECT * FROM units; DROP TABLE units; ALTER TABLE _units RENAME TO units; - - + + /* Workers */ CREATE TABLE IF NOT EXISTS _workers ( worker_id INTEGER PRIMARY KEY, worker_name TEXT NOT NULL UNIQUE, provider_type TEXT NOT NULL, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) ); INSERT INTO _workers SELECT * FROM workers; DROP TABLE workers; ALTER TABLE _workers RENAME TO workers; - - + + /* Agents */ CREATE TABLE IF NOT EXISTS _agents ( agent_id INTEGER PRIMARY KEY, @@ -148,7 +149,7 @@ task_type TEXT NOT NULL, provider_type TEXT NOT NULL, status TEXT NOT NULL, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP, + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), FOREIGN KEY (worker_id) REFERENCES workers (worker_id), FOREIGN KEY (unit_id) REFERENCES units (unit_id), FOREIGN KEY (task_id) REFERENCES tasks (task_id) ON DELETE NO ACTION, @@ -158,8 +159,8 @@ INSERT INTO _agents SELECT * FROM agents; DROP TABLE agents; ALTER TABLE _agents RENAME TO agents; - - + + /* Onboarding Agents */ CREATE TABLE IF NOT EXISTS _onboarding_agents ( onboarding_agent_id INTEGER PRIMARY KEY, @@ -168,52 +169,52 @@ task_run_id INTEGER NOT NULL, task_type TEXT NOT NULL, status TEXT NOT NULL, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP, + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), FOREIGN KEY (worker_id) REFERENCES workers (worker_id), FOREIGN KEY (task_run_id) REFERENCES task_runs (task_run_id) ); INSERT INTO _onboarding_agents SELECT * FROM onboarding_agents; DROP TABLE onboarding_agents; ALTER TABLE _onboarding_agents RENAME TO onboarding_agents; - - + + /* Qualifications */ CREATE TABLE IF NOT EXISTS _qualifications ( qualification_id INTEGER PRIMARY KEY, qualification_name TEXT NOT NULL UNIQUE, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) ); INSERT INTO _qualifications SELECT * FROM qualifications; DROP TABLE qualifications; ALTER TABLE _qualifications RENAME TO qualifications; - - + + /* Granted Qualifications */ CREATE TABLE IF NOT EXISTS _granted_qualifications ( granted_qualification_id INTEGER PRIMARY KEY, worker_id INTEGER NOT NULL, qualification_id INTEGER NOT NULL, value INTEGER NOT NULL, - update_date DATETIME DEFAULT CURRENT_TIMESTAMP, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP, + update_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), FOREIGN KEY (worker_id) REFERENCES workers (worker_id), FOREIGN KEY (qualification_id) REFERENCES qualifications (qualification_id), UNIQUE (worker_id, qualification_id) ); /* Copy data from backed up table and set value from `creation_date` to `update_date` */ - INSERT INTO _granted_qualifications - SELECT - granted_qualification_id, - worker_id, - qualification_id, - value, - creation_date, - creation_date + INSERT INTO _granted_qualifications + SELECT + granted_qualification_id, + worker_id, + qualification_id, + value, + creation_date, + creation_date FROM granted_qualifications; DROP TABLE granted_qualifications; ALTER TABLE _granted_qualifications RENAME TO granted_qualifications; - - + + /* Unit Review */ CREATE TABLE IF NOT EXISTS _unit_review ( id INTEGER PRIMARY KEY, @@ -229,7 +230,7 @@ updated_qualification_value INTEGER, /* ID of `db.qualifications` (not `db.granted_qualifications`) */ revoked_qualification_id INTEGER, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP, + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), FOREIGN KEY (unit_id) REFERENCES units (unit_id), FOREIGN KEY (worker_id) REFERENCES workers (worker_id), @@ -238,8 +239,8 @@ INSERT INTO _unit_review SELECT * FROM unit_review; DROP TABLE unit_review; ALTER TABLE _unit_review RENAME TO unit_review; - - + + /* Enable FK constraints back */ PRAGMA foreign_keys = on; """ diff --git a/mephisto/abstractions/providers/mock/mock_datastore.py b/mephisto/abstractions/providers/mock/mock_datastore.py index da17a832d..1b4ac5912 100644 --- a/mephisto/abstractions/providers/mock/mock_datastore.py +++ b/mephisto/abstractions/providers/mock/mock_datastore.py @@ -132,7 +132,6 @@ def ensure_worker_exists(self, worker_id: str) -> None: table_name="workers", params={ "worker_id": worker_id, - "is_blocked": False, }, select_field="worker_id", ) diff --git a/mephisto/abstractions/providers/mock/mock_datastore_export.py b/mephisto/abstractions/providers/mock/mock_datastore_export.py index f3fc14ecd..a0fdb45dc 100644 --- a/mephisto/abstractions/providers/mock/mock_datastore_export.py +++ b/mephisto/abstractions/providers/mock/mock_datastore_export.py @@ -27,14 +27,20 @@ def export_datastore( # Find and serialize `units` unit_ids = [i["unit_id"] for i in mephisto_db_data["units"]] unit_rows = db_utils.select_rows_by_list_of_field_values( - datastore, "units", ["unit_id"], [unit_ids], + datastore, + "units", + ["unit_id"], + [unit_ids], ) dump_data["units"] = db_utils.serialize_data_for_table(unit_rows) # Find and serialize `workers` worker_ids = [i["worker_id"] for i in mephisto_db_data["workers"]] workers_rows = db_utils.select_rows_by_list_of_field_values( - datastore, "workers", ["worker_id"], [worker_ids], + datastore, + "workers", + ["worker_id"], + [worker_ids], ) dump_data["workers"] = db_utils.serialize_data_for_table(workers_rows) diff --git a/mephisto/abstractions/providers/mturk/migrations/_001_20240325_preparing_db_for_merge_dbs_command.py b/mephisto/abstractions/providers/mturk/migrations/_001_20240325_preparing_db_for_merge_dbs_command.py new file mode 100644 index 000000000..d38dd9d40 --- /dev/null +++ b/mephisto/abstractions/providers/mturk/migrations/_001_20240325_preparing_db_for_merge_dbs_command.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 + +# Copyright (c) Meta Platforms and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +""" +1. Modified default value for `creation_date` +""" + + +PREPARING_DB_FOR_MERGE_DBS_COMMAND = """ + /* Disable FK constraints */ + PRAGMA foreign_keys = off; + + + /* Hits */ + CREATE TABLE IF NOT EXISTS _hits ( + hit_id TEXT PRIMARY KEY UNIQUE, + unit_id TEXT, + assignment_id TEXT, + link TEXT, + assignment_time_in_seconds INTEGER NOT NULL, + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) + ); + INSERT INTO _hits SELECT * FROM hits; + DROP TABLE hits; + ALTER TABLE _hits RENAME TO hits; + + + /* Run mappings */ + CREATE TABLE IF NOT EXISTS _run_mappings ( + hit_id TEXT, + run_id TEXT + ); + INSERT INTO _run_mappings SELECT * FROM run_mappings; + DROP TABLE run_mappings; + ALTER TABLE _run_mappings RENAME TO run_mappings; + + + /* Runs */ + CREATE TABLE IF NOT EXISTS _runs ( + run_id TEXT PRIMARY KEY UNIQUE, + arn_id TEXT, + hit_type_id TEXT NOT NULL, + hit_config_path TEXT NOT NULL, + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), + frame_height INTEGER NOT NULL DEFAULT 650 + ); + INSERT INTO _runs SELECT * FROM runs; + DROP TABLE runs; + ALTER TABLE _runs RENAME TO runs; + + + /* Qualifications */ + CREATE TABLE IF NOT EXISTS _qualifications ( + qualification_name TEXT PRIMARY KEY UNIQUE, + requester_id TEXT, + mturk_qualification_name TEXT, + mturk_qualification_id TEXT, + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) + ); + INSERT INTO _qualifications SELECT * FROM qualifications; + DROP TABLE qualifications; + ALTER TABLE _qualifications RENAME TO qualifications; + + + /* Enable FK constraints back */ + PRAGMA foreign_keys = on; +""" diff --git a/mephisto/abstractions/providers/mturk/migrations/__init__.py b/mephisto/abstractions/providers/mturk/migrations/__init__.py new file mode 100644 index 000000000..092965e1b --- /dev/null +++ b/mephisto/abstractions/providers/mturk/migrations/__init__.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 + +# Copyright (c) Meta Platforms and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +from ._001_20240325_preparing_db_for_merge_dbs_command import * + + +migrations = { + "20240418_preparing_db_for_merge_dbs_command": PREPARING_DB_FOR_MERGE_DBS_COMMAND, +} diff --git a/mephisto/abstractions/providers/mturk/mturk_datastore.py b/mephisto/abstractions/providers/mturk/mturk_datastore.py index 517cae0b0..714860e6b 100644 --- a/mephisto/abstractions/providers/mturk/mturk_datastore.py +++ b/mephisto/abstractions/providers/mturk/mturk_datastore.py @@ -18,8 +18,10 @@ from botocore.exceptions import ProfileNotFound # type: ignore from mephisto.abstractions.databases.local_database import is_unique_failure +from mephisto.utils.db import apply_migrations from mephisto.utils.logger_core import get_logger from . import mturk_datastore_tables as tables +from .migrations import migrations from .mturk_datastore_export import export_datastore MTURK_REGION_NAME = "us-east-1" @@ -90,6 +92,8 @@ def init_tables(self) -> None: except Exception: pass # extra column already exists + apply_migrations(self, migrations) + def get_export_data(self, **kwargs) -> dict: return export_datastore(self, **kwargs) diff --git a/mephisto/abstractions/providers/mturk/mturk_datastore_export.py b/mephisto/abstractions/providers/mturk/mturk_datastore_export.py index f538801a5..a558926cb 100644 --- a/mephisto/abstractions/providers/mturk/mturk_datastore_export.py +++ b/mephisto/abstractions/providers/mturk/mturk_datastore_export.py @@ -31,7 +31,10 @@ def export_datastore( for table_name in tables_with_task_run_relations: table_rows = db_utils.select_rows_by_list_of_field_values( - datastore, table_name, ["run_id"], [task_run_ids], + datastore, + table_name, + ["run_id"], + [task_run_ids], ) runs_table_data = db_utils.serialize_data_for_table(table_rows) dump_data[table_name] = runs_table_data @@ -39,7 +42,10 @@ def export_datastore( # Find and serialize `hits` hit_ids = list(set(filter(bool, [i["hit_id"] for i in dump_data["run_mappings"]]))) hit_rows = db_utils.select_rows_by_list_of_field_values( - datastore, "hits", ["hit_id"], [hit_ids], + datastore, + "hits", + ["hit_id"], + [hit_ids], ) dump_data["hits"] = db_utils.serialize_data_for_table(hit_rows) @@ -47,7 +53,10 @@ def export_datastore( qualification_names = [i["qualification_name"] for i in mephisto_db_data["qualifications"]] if qualification_names: qualification_rows = db_utils.select_rows_by_list_of_field_values( - datastore, "qualifications", ["qualification_name"], [qualification_names], + datastore, + "qualifications", + ["qualification_name"], + [qualification_names], ) else: qualification_rows = db_utils.select_all_table_rows(datastore, "qualifications") diff --git a/mephisto/abstractions/providers/prolific/migrations/_001_20240325_preparing_db_for_merge_dbs_command.py b/mephisto/abstractions/providers/prolific/migrations/_001_20240325_preparing_db_for_merge_dbs_command.py index 6dbe1c116..f792c223e 100644 --- a/mephisto/abstractions/providers/prolific/migrations/_001_20240325_preparing_db_for_merge_dbs_command.py +++ b/mephisto/abstractions/providers/prolific/migrations/_001_20240325_preparing_db_for_merge_dbs_command.py @@ -10,14 +10,15 @@ 3. Added `creation_date` in `units` table 4. Rename field `run_id` -> `task_run_id` 5. Remove table `requesters` +6. Modified default value for `creation_date` """ PREPARING_DB_FOR_MERGE_DBS_COMMAND = """ /* Disable FK constraints */ PRAGMA foreign_keys = off; - - + + /* Studies */ CREATE TABLE IF NOT EXISTS _studies ( id INTEGER PRIMARY KEY, @@ -26,26 +27,26 @@ link TEXT, task_run_id TEXT UNIQUE, assignment_time_in_seconds INTEGER NOT NULL, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) ); INSERT INTO _studies SELECT * FROM studies; DROP TABLE studies; ALTER TABLE _studies RENAME TO studies; - - + + /* Submissions */ CREATE TABLE IF NOT EXISTS _submissions ( id INTEGER PRIMARY KEY, prolific_submission_id TEXT UNIQUE, prolific_study_id TEXT, status TEXT DEFAULT NULL, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) ); INSERT INTO _submissions SELECT * FROM submissions; DROP TABLE submissions; ALTER TABLE _submissions RENAME TO submissions; - - + + /* Run Mappings */ CREATE TABLE IF NOT EXISTS _run_mappings ( id INTEGER PRIMARY KEY, @@ -55,8 +56,8 @@ INSERT INTO _run_mappings SELECT * FROM run_mappings; DROP TABLE run_mappings; ALTER TABLE _run_mappings RENAME TO run_mappings; - - + + /* Units */ CREATE TABLE IF NOT EXISTS _units ( id INTEGER PRIMARY KEY, @@ -65,44 +66,44 @@ prolific_study_id TEXT, prolific_submission_id TEXT, is_expired BOOLEAN DEFAULT false, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) ); /* Copy data from backed up table and set values for `creation_date` */ - INSERT INTO _units - SELECT - id, - unit_id, - run_id, - prolific_study_id, - prolific_submission_id, - is_expired, - datetime('now', 'localtime') + INSERT INTO _units + SELECT + id, + unit_id, + run_id, + prolific_study_id, + prolific_submission_id, + is_expired, + datetime('now', 'localtime') FROM units; DROP TABLE units; ALTER TABLE _units RENAME TO units; - - + + /* Workers */ CREATE TABLE IF NOT EXISTS _workers ( id INTEGER PRIMARY KEY, worker_id TEXT UNIQUE, is_blocked BOOLEAN default false, - update_date DATETIME DEFAULT CURRENT_TIMESTAMP, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP + update_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) ); /* Copy data from backed up table and set values for `creation_date` and `update_date` */ - INSERT INTO _workers - SELECT - id, - worker_id, - is_blocked, - datetime('now', 'localtime'), - datetime('now', 'localtime') + INSERT INTO _workers + SELECT + id, + worker_id, + is_blocked, + datetime('now', 'localtime'), + datetime('now', 'localtime') FROM workers; DROP TABLE workers; ALTER TABLE _workers RENAME TO workers; - - + + /* Runs */ CREATE TABLE IF NOT EXISTS _runs ( id INTEGER PRIMARY KEY, @@ -112,7 +113,7 @@ prolific_project_id TEXT NOT NULL, prolific_study_id TEXT, prolific_study_config_path TEXT NOT NULL, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP, + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')), frame_height INTEGER NOT NULL DEFAULT 650, actual_available_places INTEGER DEFAULT NULL, listed_available_places INTEGER DEFAULT NULL @@ -120,8 +121,8 @@ INSERT INTO _runs SELECT * FROM runs; DROP TABLE runs; ALTER TABLE _runs RENAME TO runs; - - + + /* Participant Groups */ CREATE TABLE IF NOT EXISTS _participant_groups ( id INTEGER PRIMARY KEY, @@ -130,13 +131,13 @@ prolific_project_id TEXT, prolific_participant_group_name TEXT, prolific_participant_group_id TEXT, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) ); INSERT INTO _participant_groups SELECT * FROM participant_groups; DROP TABLE participant_groups; ALTER TABLE _participant_groups RENAME TO participant_groups; - - + + /* Runs */ CREATE TABLE IF NOT EXISTS _qualifications ( id INTEGER PRIMARY KEY, @@ -144,21 +145,21 @@ task_run_id TEXT, json_qual_logic TEXT, qualification_ids TEXT, - creation_date DATETIME DEFAULT CURRENT_TIMESTAMP + creation_date DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) ); INSERT INTO _qualifications SELECT * FROM qualifications; DROP TABLE qualifications; ALTER TABLE _qualifications RENAME TO qualifications; - - + + /* Enable FK constraints back */ PRAGMA foreign_keys = on; - - + + ALTER TABLE run_mappings RENAME COLUMN run_id TO task_run_id; ALTER TABLE units RENAME COLUMN run_id TO task_run_id; ALTER TABLE runs RENAME COLUMN run_id TO task_run_id; - - + + DROP TABLE IF EXISTS requesters; """ diff --git a/mephisto/abstractions/providers/prolific/prolific_datastore_export.py b/mephisto/abstractions/providers/prolific/prolific_datastore_export.py index cc1d85027..502058041 100644 --- a/mephisto/abstractions/providers/prolific/prolific_datastore_export.py +++ b/mephisto/abstractions/providers/prolific/prolific_datastore_export.py @@ -34,7 +34,9 @@ def export_datastore( for table_name in tables_with_task_run_relations: table_rows = db_utils.select_rows_from_table_related_to_task_run( - datastore, table_name, task_run_ids, + datastore, + table_name, + task_run_ids, ) runs_table_data = db_utils.serialize_data_for_table(table_rows) dump_data[table_name] = runs_table_data @@ -42,16 +44,22 @@ def export_datastore( # Find and serialize `submissions` study_ids = list(set(filter(bool, [i["prolific_study_id"] for i in dump_data["studies"]]))) submission_rows = db_utils.select_rows_by_list_of_field_values( - datastore, "submissions", ["prolific_study_id"], [study_ids], + datastore, + "submissions", + ["prolific_study_id"], + [study_ids], ) dump_data["submissions"] = db_utils.serialize_data_for_table(submission_rows) # Find and serialize `participant_groups` - participant_group_ids = list(set(filter(bool, [ - i["prolific_participant_group_id"] for i in dump_data["qualifications"] - ]))) + participant_group_ids = list( + set(filter(bool, [i["prolific_participant_group_id"] for i in dump_data["qualifications"]])) + ) participant_group_rows = db_utils.select_rows_by_list_of_field_values( - datastore, "participant_groups", ["prolific_participant_group_id"], [participant_group_ids], + datastore, + "participant_groups", + ["prolific_participant_group_id"], + [participant_group_ids], ) dump_data["participant_groups"] = db_utils.serialize_data_for_table(participant_group_rows) @@ -59,10 +67,13 @@ def export_datastore( worker_ids = [i["worker_name"] for i in mephisto_db_data["workers"]] if worker_ids: worker_rows = db_utils.select_rows_by_list_of_field_values( - datastore, "workers", ["worker_id"], [worker_ids], + datastore, + "workers", + ["worker_id"], + [worker_ids], ) else: - worker_rows = db_utils.select_all_table_rows(datastore, "workers") + worker_rows = [] dump_data["workers"] = db_utils.serialize_data_for_table(worker_rows) return dump_data diff --git a/mephisto/client/cli.py b/mephisto/client/cli.py index ef42dca60..6f6c280ce 100644 --- a/mephisto/client/cli.py +++ b/mephisto/client/cli.py @@ -31,6 +31,7 @@ import mephisto.scripts.mturk.launch_makeup_hits as launch_makeup_hits_mturk import mephisto.scripts.mturk.print_outstanding_hit_status as soft_block_workers_by_mturk_id_mturk from mephisto.client.cli_commands import get_wut_arguments +from mephisto.client.cli_db_commands import db_cli from mephisto.generators.form_composer.config_validation.separate_token_values_config import ( update_separate_token_values_config_with_file_urls, ) @@ -49,8 +50,6 @@ set_custom_validators_js_env_var, ) from mephisto.operations.registry import get_valid_provider_types -from mephisto.tools.db_data_porter import DBDataPorter -from mephisto.tools.db_data_porter.constants import DEFAULT_CONFLICT_RESOLVER from mephisto.tools.scripts import build_custom_bundle from mephisto.utils.console_writer import ConsoleWriter from mephisto.utils.rich import console @@ -81,8 +80,8 @@ def cli(): @cli.command("config", cls=RichCommand) -@click.argument("identifier", type=(str), default=None, required=False) -@click.argument("value", type=(str), default=None, required=False) +@click.argument("identifier", type=str, default=None, required=False) +@click.argument("value", type=str, default=None, required=False) def config(identifier, value): from mephisto.operations.config_handler import ( get_config_arg, @@ -197,6 +196,7 @@ def register_provider(args): try: parsed_options = parse_arg_dict(RequesterClass, args_dict) except Exception as e: + parsed_options = None click.echo(str(e)) if parsed_options.name is None: @@ -364,11 +364,11 @@ def metrics_cli(args): @cli.command("review_app", cls=RichCommand) -@click.option("-h", "--host", type=(str), default="127.0.0.1") -@click.option("-p", "--port", type=(int), default=5000) -@click.option("-d", "--debug", type=(bool), default=False, is_flag=True) -@click.option("-f", "--force-rebuild", type=(bool), default=False, is_flag=True) -@click.option("-s", "--skip-build", type=(bool), default=False, is_flag=True) +@click.option("-h", "--host", type=str, default="127.0.0.1") +@click.option("-p", "--port", type=int, default=5000) +@click.option("-d", "--debug", type=bool, default=False, is_flag=True) +@click.option("-f", "--force-rebuild", type=bool, default=False, is_flag=True) +@click.option("-s", "--skip-build", type=bool, default=False, is_flag=True) @pass_script_info def review_app( info: ScriptInfo, @@ -462,7 +462,7 @@ def _get_form_composer_app_path() -> str: @cli.command("form_composer", cls=RichCommand) -@click.option("-o", "--task-data-config-only", type=(bool), default=True, is_flag=True) +@click.option("-o", "--task-data-config-only", type=bool, default=True, is_flag=True) def form_composer(task_data_config_only: bool = True): # Get app path to run Python script from there (instead of the current file's directory). # This is necessary, because the whole infrastructure is built relative to the location @@ -501,12 +501,12 @@ def form_composer(task_data_config_only: bool = True): @cli.command("form_composer_config", cls=RichCommand) -@click.option("-v", "--verify", type=(bool), default=False, is_flag=True) -@click.option("-f", "--update-file-location-values", type=(str), default=None) -@click.option("-e", "--extrapolate-token-sets", type=(bool), default=False, is_flag=True) -@click.option("-p", "--permutate-separate-tokens", type=(bool), default=False, is_flag=True) -@click.option("-d", "--directory", type=(str), default=None) -@click.option("-u", "--use-presigned-urls", type=(bool), default=False, is_flag=True) +@click.option("-v", "--verify", type=bool, default=False, is_flag=True) +@click.option("-f", "--update-file-location-values", type=str, default=None) +@click.option("-e", "--extrapolate-token-sets", type=bool, default=False, is_flag=True) +@click.option("-p", "--permutate-separate-tokens", type=bool, default=False, is_flag=True) +@click.option("-d", "--directory", type=str, default=None) +@click.option("-u", "--use-presigned-urls", type=bool, default=False, is_flag=True) def form_composer_config( verify: Optional[bool] = False, update_file_location_values: Optional[str] = None, @@ -623,189 +623,7 @@ def form_composer_config( ) -@cli.command("db", cls=RichCommand) -@click.argument("action_name", required=True, nargs=1) -@click.option("-d", "--dump-file", type=(str), default=None) -@click.option("-i", "--export-indent", type=(int), default=None) -@click.option("-tn", "--export-tasks-by-names", type=(str), multiple=True, default=None) -@click.option("-ti", "--export-tasks-by-ids", type=(str), multiple=True, default=None) -@click.option("-tr", "--export-task-runs-by-ids", type=(str), multiple=True, default=None) -@click.option("-trs", "--export-task-runs-since-date", type=(str), default=None) -@click.option("-tl", "--export-labels", type=(str), multiple=True, default=None) -@click.option("-de", "--delete-exported-data", type=(bool), default=False, is_flag=True) -@click.option("-r", "--randomize-legacy-ids", type=(bool), default=False, is_flag=True) -@click.option("-l", "--label-name", type=(str), default=None) -@click.option("-cr", "--conflict-resolver", type=(str), default=DEFAULT_CONFLICT_RESOLVER) -@click.option("-k", "--keep-import-metadata", type=(bool), default=False, is_flag=True) -@click.option("-b", "--backup-file", type=(str), default=None) -@click.option("-v", "--verbosity", type=(int), default=0) -def db( - action_name: str, - dump_file: Optional[str] = None, - export_indent: Optional[int] = None, - export_tasks_by_names: Optional[List[str]] = None, - export_tasks_by_ids: Optional[List[str]] = None, - export_task_runs_by_ids: Optional[List[str]] = None, - export_task_runs_since_date: Optional[str] = None, - export_labels: Optional[List[str]] = None, - delete_exported_data: bool = False, - randomize_legacy_ids: bool = False, - label_name: Optional[str] = None, - conflict_resolver: Optional[str] = DEFAULT_CONFLICT_RESOLVER, - keep_import_metadata: Optional[bool] = False, - backup_file: Optional[str] = None, - verbosity: int = 0, -): - """ - Operations with Mephisto DB and provider-specific datastores. - - Commands: - 1. mephisto db export - This command exports data from Mephisto DB and provider-specific datastores - as a combination of (i) a JSON file, and (ii) an archived `data` catalog with related files. - - If no parameter passed, full data dump (i.e. backup) will be created. - - To pass a list of values for one command option, - simply repeat that option name before each value. - - Options (all optional): - `-tn/--export-tasks-by-names` - names of Tasks that will be exported - `-ti/--export-tasks-by-ids` - ids of Tasks that will be exported - `-tr/--export-task-runs-by-ids` - ids of TaskRuns that will be exported - `-trs/--export-task-runs-since-date` - only objects created after this - ISO8601 datetime will be exported - `-tl/--export-labels` - only data imported under these labels will be exported - `-de/--delete-exported-data` - after exporting data, delete it from local DB - `-r/--randomize-legacy-ids` - replace legacy autoincremented ids with - new pseudo-random ids to avoid conflicts during data merging - `-i/--export-indent` - make dump easy to read via formatting JSON with indentations - `-v/--verbosity` - write more informative messages about progress - (Default 0. Values: 0, 1) - - - 2. mephisto db import --dump-file - - This command imports data from a dump file created by `mephisto db export` command. - - Options: - `-d/--dump-file` - location of the __***.json__ dump file (filename if created in - `/outputs/export` folder, or absolute filepath) - `-cr/--conflict-resolver` (Optional) - name of Python class - to be used for resolving merging conflicts (when your local DB already has a row - with same unique field value as a DB row in the dump data) - `-l/--label-name` - a short string serving as a reference for the ported data - (stored in `imported_data` table), so later you can export the imported data - with `--export-labels` export option - `-k/--keep-import-metadata` - write data from `imported_data` table of the dump - (by default it's not imported) - `-v/--verbosity` - level of logging (default: 0; values: 0, 1) - - 3. mephisto db backup - - Creates full backup of all current data (Mephisto DB, provider-specific datastores, - and related files) on local machine. - - 4. mephisto db restore --backup-file - - Restores all data (Mephisto DB, provider-specific datastores, and related files) - from a backup archive. - - Options: - `-b/--backup-file` - location of the __*.zip__ backup file (filename if created in - `/outputs/backup` folder, or absolute filepath) - `-v/--verbosity` - level of logging (default: 0; values: 0, 1) - """ - porter = DBDataPorter() - - # --- EXPORT --- - if action_name == "export": - has_conflicting_task_runs_options = len(list(filter(bool, [ - export_tasks_by_names, - export_tasks_by_ids, - export_task_runs_by_ids, - export_task_runs_since_date, - export_labels, - ]))) > 1 - - if has_conflicting_task_runs_options: - logger.warning( - "[yellow]" - "You cannot use following options together:" - "\n\t--export-tasks-by-names" - "\n\t--export-tasks-by-ids" - "\n\t--export-task-runs-by-ids" - "\n\t--export-task-runs-since-date" - "\n\t--export-labels" - "\nUse one of them or none of them to export all data." - "[/yellow]" - ) - exit() - - logger.info(f"Started exporting") - - export_results = porter.export_dump( - json_indent=export_indent, - task_names=export_tasks_by_names, - task_ids=export_tasks_by_ids, - task_run_ids=export_task_runs_by_ids, - task_runs_since_date=export_task_runs_since_date, - task_runs_labels=export_labels, - delete_exported_data=delete_exported_data, - randomize_legacy_ids=randomize_legacy_ids, - verbosity=verbosity, - ) - - data_files_line = "" - if export_results["data_path"]: - data_files_line = f"\n\t- Data files dump - {export_results['data_path']}" - - backup_line = "" - if export_results["backup_path"]: - backup_line = f"\n\t- Backup - {export_results['backup_path']}" - - logger.info( - f"[green]" - f"Finished successfully! " - f"\nFiles created:" - f"\n\t- Database dump - {export_results['db_path']}" - f"{data_files_line}" - f"{backup_line}" - f"[/green]" - ) - - # --- IMPORT --- - elif action_name == "import": - logger.info(f"Started importing from dump '{dump_file}'") - porter.import_dump( - dump_file_name_or_path=dump_file, - conflict_resolver_name=conflict_resolver, - label=label_name, - keep_import_metadata=keep_import_metadata, - verbosity=verbosity, - ) - logger.info(f"[green]Finished successfully[/green]") - - # --- BACKUP --- - elif action_name == "backup": - logger.info(f"Started making backup") - backup_path = porter.make_backup() - logger.info(f"[green]Finished successfully! File: '{backup_path}[/green]") - - # --- RESTORE --- - elif action_name == "restore": - logger.info(f"Started restoring from backup '{backup_file}'") - porter.restore_from_backup(backup_file_name_or_path=backup_file, verbosity=verbosity) - logger.info(f"[green]Finished successfully[/green]") - - # Otherwise, error - else: - logger.error( - f"[red]" - f"Unexpected action name '{action_name}'. Available: export, import, restore." - f"[/red]" - ) - exit() +cli.add_command(db_cli) if __name__ == "__main__": diff --git a/mephisto/client/cli_db_commands.py b/mephisto/client/cli_db_commands.py new file mode 100644 index 000000000..17790ec14 --- /dev/null +++ b/mephisto/client/cli_db_commands.py @@ -0,0 +1,318 @@ +#!/usr/bin/env python3 + +# Copyright (c) Meta Platforms and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +from typing import List +from typing import Optional + +import click +from rich_click import RichCommand +from rich_click import RichContext + +from mephisto.tools.db_data_porter import DBDataPorter +from mephisto.tools.db_data_porter.constants import DEFAULT_CONFLICT_RESOLVER +from mephisto.tools.db_data_porter.export_dump import get_export_options_for_metadata +from mephisto.utils.console_writer import ConsoleWriter + +VERBOSITY_HELP = "write more informative messages about progress (Default 0. Values: 0, 1)" +VERBOSITY_DEFAULT_VALUE = 0 + +logger = ConsoleWriter() + + +def _print_used_options_for_running_command_message(ctx: RichContext, options: dict): + message = "Running command with the following options:\n" + for p in ctx.command.params: + values = options[p.name] + + if isinstance(values, tuple): + values = list(values) + if not values: + values = None + + message += f"\t{'/'.join(p.opts)} = {values}\n" + + logger.debug(message) + + +@click.group(name="db", context_settings=dict(help_option_names=["-h", "--help"])) +def db_cli(): + """Operations with Mephisto DB and provider-specific datastores.""" + pass + + +# --- EXPORT --- +@db_cli.command("export", cls=RichCommand) +@click.pass_context +@click.option( + "-i", + "--export-indent", + type=int, + default=2, + help="make dump easy to read via formatting JSON with indentations (Default 2)", +) +@click.option( + "-tn", + "--export-tasks-by-names", + type=str, + multiple=True, + default=None, + help="names of Tasks that will be exported", +) +@click.option( + "-ti", + "--export-tasks-by-ids", + type=str, + multiple=True, + default=None, + help="ids of Tasks that will be exported", +) +@click.option( + "-tri", + "--export-task-runs-by-ids", + type=str, + multiple=True, + default=None, + help="ids of TaskRuns that will be exported", +) +@click.option( + "-trs", + "--export-task-runs-since-date", + type=str, + default=None, + help="only objects created after this ISO8601 datetime will be exported", +) +@click.option( + "-l", + "--labels", + type=str, + multiple=True, + default=None, + help="only data imported under these labels will be exported", +) +@click.option( + "-del", + "--delete-exported-data", + type=bool, + default=False, + is_flag=True, + help="after exporting data, delete it from local DB", +) +@click.option( + "-r", + "--randomize-legacy-ids", + type=bool, + default=False, + is_flag=True, + help=( + "replace legacy autoincremented ids with new pseudo-random ids " + "to avoid conflicts during data merging" + ), +) +@click.option("-v", "--verbosity", type=int, default=VERBOSITY_DEFAULT_VALUE, help=VERBOSITY_HELP) +def export(ctx: RichContext, **options: dict): + """ + This command exports data from Mephisto DB and provider-specific datastores + as an archived combination of (i) a JSON file, and (ii) a `data` catalog with related files. + If no parameter passed, full data dump (i.e. backup) will be created. + To pass a list of values for one command option, + simply repeat that option name before each value. + + mephisto db export + """ + _print_used_options_for_running_command_message(ctx, options) + + export_indent: Optional[int] = options.get("export_indent", 2) + export_tasks_by_names: Optional[List[str]] = options.get("export_tasks_by_names") + export_tasks_by_ids: Optional[List[str]] = options.get("export_tasks_by_ids") + export_task_runs_by_ids: Optional[List[str]] = options.get("export_task_runs_by_ids") + export_task_runs_since_date: Optional[str] = options.get("export_task_runs_since_date") + export_labels: Optional[List[str]] = options.get("export_labels") + delete_exported_data: bool = options.get("delete_exported_data", False) + randomize_legacy_ids: bool = options.get("randomize_legacy_ids", False) + verbosity: int = options.get("verbosity", VERBOSITY_DEFAULT_VALUE) + + porter = DBDataPorter() + + has_conflicting_task_runs_options = ( + len( + list( + filter( + bool, + [ + export_tasks_by_names, + export_tasks_by_ids, + export_task_runs_by_ids, + export_task_runs_since_date, + export_labels, + ], + ) + ) + ) + > 1 + ) + + if has_conflicting_task_runs_options: + logger.warning( + "[yellow]" + "You cannot use following options together:" + "\n\t--export-tasks-by-names" + "\n\t--export-tasks-by-ids" + "\n\t--export-task-runs-by-ids" + "\n\t--export-task-runs-since-date" + "\n\t--labels" + "\nUse one of them or none of them to export all data." + "[/yellow]" + ) + exit() + + export_results = porter.export_dump( + json_indent=export_indent, + task_names=export_tasks_by_names, + task_ids=export_tasks_by_ids, + task_run_ids=export_task_runs_by_ids, + task_runs_since_date=export_task_runs_since_date, + task_runs_labels=export_labels, + delete_exported_data=delete_exported_data, + randomize_legacy_ids=randomize_legacy_ids, + metadata_export_options=get_export_options_for_metadata(ctx, options), + verbosity=verbosity, + ) + + backup_line = "" + if export_results["backup_path"]: + backup_line = f"\nCreated backup file (just in case): {export_results['backup_path']}" + + logger.info( + f"[green]" + f"Finished successfully, saved to file: {export_results['dump_path']}" + f"{backup_line}" + f"[/green]" + ) + + +# --- IMPORT --- +@db_cli.command("import", cls=RichCommand) +@click.pass_context +@click.option( + "-f", + "--file", + type=str, + default=None, + help=( + "location of the `***.zip` dump file " + "(filename if created in `/outputs/export` folder, or absolute filepath)" + ), +) +@click.option( + "-l", + "--labels", + type=str, + multiple=True, + default=None, + help=( + "a short strings serving as a reference for the ported data " + "(stored in `imported_data` table), " + "so later you can export the imported data with `--labels` export option" + ), +) +@click.option( + "-cr", + "--conflict-resolver", + type=str, + default=DEFAULT_CONFLICT_RESOLVER, + help=( + "(Optional) name of Python class to be used for resolving merging conflicts " + "(when your local DB already has a row with same unique field value " + "as a DB row in the dump data)" + ), +) +@click.option( + "-k", + "--keep-import-metadata", + type=bool, + default=False, + is_flag=True, + help="write data from `imported_data` table of the dump (by default it's not imported)", +) +@click.option("-v", "--verbosity", type=int, default=VERBOSITY_DEFAULT_VALUE, help=VERBOSITY_HELP) +def _import(ctx: RichContext, **options: dict): + """ + This command imports data from a dump file created by `mephisto db export` command. + + mephisto db import --file + """ + _print_used_options_for_running_command_message(ctx, options) + + file: Optional[str] = options.get("file") + labels: Optional[str] = options.get("labels") + conflict_resolver: Optional[str] = options.get("conflict_resolver", DEFAULT_CONFLICT_RESOLVER) + keep_import_metadata: Optional[bool] = options.get("keep_import_metadata", False) + verbosity: int = options.get("verbosity", VERBOSITY_DEFAULT_VALUE) + + porter = DBDataPorter() + results = porter.import_dump( + dump_archive_file_name_or_path=file, + conflict_resolver_name=conflict_resolver, + labels=labels, + keep_import_metadata=keep_import_metadata, + verbosity=verbosity, + ) + logger.info( + f"[green]" + f"Finished successfully. Imported {results['imported_task_runs_number']} TaskRuns" + f"[/green]" + ) + + +# --- BACKUP --- +@db_cli.command("backup", cls=RichCommand) +@click.pass_context +@click.option("-v", "--verbosity", type=int, default=VERBOSITY_DEFAULT_VALUE, help=VERBOSITY_HELP) +def backup(ctx: RichContext, **options: dict): + """ + Creates full backup of all current data (Mephisto DB, provider-specific datastores, + and related files) on local machine. + + mephisto db backup + """ + _print_used_options_for_running_command_message(ctx, options) + + verbosity: int = options.get("verbosity", VERBOSITY_DEFAULT_VALUE) + + porter = DBDataPorter() + backup_path = porter.create_backup(verbosity=verbosity) + logger.info(f"[green]Finished successfully, saved to file: {backup_path}[/green]") + + +# --- RESTORE --- +@db_cli.command("restore", cls=RichCommand) +@click.pass_context +@click.option( + "-f", + "--file", + type=str, + default=None, + help=( + "location of the `***.zip` backup file (filename if created in " + "`/outputs/backup` folder, or absolute filepath)" + ), +) +@click.option("-v", "--verbosity", type=int, default=VERBOSITY_DEFAULT_VALUE, help=VERBOSITY_HELP) +def restore(ctx: RichContext, **options): + """ + Restores all data (Mephisto DB, provider-specific datastores, and related files) + from a backup archive. + + mephisto db restore --file + """ + _print_used_options_for_running_command_message(ctx, options) + + file: str = options.get("file") + verbosity: int = options.get("verbosity", VERBOSITY_DEFAULT_VALUE) + + porter = DBDataPorter() + porter.restore_from_backup(backup_file_name_or_path=file, verbosity=verbosity) + logger.info(f"[green]Finished successfully[/green]") diff --git a/mephisto/tools/db_data_porter/backups.py b/mephisto/tools/db_data_porter/backups.py index 9222917ad..a77e15a50 100644 --- a/mephisto/tools/db_data_porter/backups.py +++ b/mephisto/tools/db_data_porter/backups.py @@ -6,165 +6,31 @@ import os import shutil -from distutils.dir_util import copy_tree from pathlib import Path -from typing import List -from mephisto.abstractions.database import MephistoDB -from mephisto.data_model.task_run import TaskRun -from mephisto.tools.db_data_porter.constants import AGENTS_TABLE_NAME -from mephisto.tools.db_data_porter.constants import ASSIGNMENTS_TABLE_NAME -from mephisto.tools.db_data_porter.constants import MEPHISTO_DUMP_KEY -from mephisto.tools.db_data_porter.constants import TASK_RUNS_TABLE_NAME -from mephisto.tools.db_data_porter.randomize_ids import get_old_pk_from_substitutions -from mephisto.utils import db as db_utils +from mephisto.tools.db_data_porter.constants import DEFAULT_ARCHIVE_FORMAT from mephisto.utils.console_writer import ConsoleWriter from mephisto.utils.dirs import get_data_dir -from mephisto.utils.dirs import get_mephisto_tmp_dir - -DEFAULT_ARCHIVE_FORMAT = "zip" logger = ConsoleWriter() -def _rename_dirs_with_new_pks(task_run_dirs: List[str], pk_substitutions: dict): - def rename_dir_with_new_pk(dir_path: str, substitutions: dict) -> str: - dump_id = substitutions.get(os.path.basename(dir_path)) - renamed_dir_path = os.path.join(os.path.dirname(dir_path), dump_id) - os.rename(dir_path, renamed_dir_path) - return renamed_dir_path - - task_runs_subs = pk_substitutions.get(MEPHISTO_DUMP_KEY, {}).get(TASK_RUNS_TABLE_NAME, {}) - if not task_runs_subs: - # Nothing to rename - return - - assignment_subs = pk_substitutions.get(MEPHISTO_DUMP_KEY, {}).get(ASSIGNMENTS_TABLE_NAME, {}) - agent_subs = pk_substitutions.get(MEPHISTO_DUMP_KEY, {}).get(AGENTS_TABLE_NAME, {}) - - task_run_dirs = [ - d for d in task_run_dirs if os.path.basename(d) in task_runs_subs.keys() - ] - for task_run_dir in task_run_dirs: - # Rename TaskRun dir - renamed_task_run_dir = rename_dir_with_new_pk(task_run_dir, task_runs_subs) - - # Rename Assignments dirs - assignments_dirs = [ - os.path.join(renamed_task_run_dir, d) for d in os.listdir(renamed_task_run_dir) - if d in assignment_subs.keys() - ] - for assignment_dir in assignments_dirs: - renamed_assignment_dir = rename_dir_with_new_pk(assignment_dir, assignment_subs) - - # Rename Agents dirs - agents_dirs = [ - os.path.join(renamed_assignment_dir, d) for d in os.listdir(renamed_assignment_dir) - if d in agent_subs.keys() - ] - for agent_dir in agents_dirs: - rename_dir_with_new_pk(agent_dir, agent_subs) - - -def _export_data_dir_for_task_runs( - input_dir_path: str, - archive_file_path_without_ext: str, - task_runs: List[TaskRun], - pk_substitutions: dict, - _format: str = DEFAULT_ARCHIVE_FORMAT, - verbosity: int = 0, -) -> bool: - tmp_dir = get_mephisto_tmp_dir() - tmp_export_dir = os.path.join(tmp_dir, "export") - - task_run_data_dirs = [i.get_run_dir() for i in task_runs] - if not task_run_data_dirs: - return False - - try: - tmp_task_run_dirs = [] - - # Copy all files for passed TaskRuns into tmp dir - for task_run_data_dir in task_run_data_dirs: - relative_dir = Path(task_run_data_dir).relative_to(input_dir_path) - tmp_task_run_dir = os.path.join(tmp_export_dir, relative_dir) - - tmp_task_run_dirs.append(tmp_task_run_dir) - - os.makedirs(tmp_task_run_dir, exist_ok=True) - copy_tree(task_run_data_dir, tmp_task_run_dir, verbose=verbosity) - - _rename_dirs_with_new_pks(tmp_task_run_dirs, pk_substitutions) - - # Create archive in export dir - shutil.make_archive( - base_name=archive_file_path_without_ext, - format="zip", - root_dir=tmp_export_dir, - ) - finally: - # Remove tmp dir - if os.path.exists(tmp_export_dir): - shutil.rmtree(tmp_export_dir) - - return True - - def make_backup_file_path_by_timestamp( - backup_dir: str, timestamp: str, _format: str = DEFAULT_ARCHIVE_FORMAT, + backup_dir: str, + timestamp: str, + _format: str = DEFAULT_ARCHIVE_FORMAT, ) -> str: return os.path.join(backup_dir, f"{timestamp}_mephisto_backup.{_format}") -def make_full_data_dir_backup( - backup_dir: str, timestamp: str, _format: str = DEFAULT_ARCHIVE_FORMAT, -) -> str: +def make_full_data_dir_backup(backup_file_path: str, _format: str = DEFAULT_ARCHIVE_FORMAT) -> str: mephisto_data_dir = get_data_dir() - file_name_without_ext = f"{timestamp}_mephisto_backup" - archive_file_path_without_ext = os.path.join(backup_dir, file_name_without_ext) - shutil.make_archive( - base_name=archive_file_path_without_ext, + base_name=os.path.splitext(backup_file_path)[0], format=_format, root_dir=mephisto_data_dir, ) - - return make_backup_file_path_by_timestamp(backup_dir, file_name_without_ext, _format) - - -def archive_and_copy_data_files( - db: "MephistoDB", - export_dir: str, - dump_name: str, - dump_data: dict, - pk_substitutions: dict, - verbosity: int = 0, - _format: str = DEFAULT_ARCHIVE_FORMAT, -) -> bool: - mephisto_data_files_path = os.path.join(get_data_dir(), "data") - output_zip_file_base_name = os.path.join(export_dir, dump_name) # name without extension - - # Get TaskRuns for PKs in dump - task_runs: List[TaskRun] = [] - for dump_task_run in dump_data[MEPHISTO_DUMP_KEY][TASK_RUNS_TABLE_NAME]: - task_runs_pk_field_name = db_utils.get_table_pk_field_name(db, TASK_RUNS_TABLE_NAME) - dump_pk = dump_task_run[task_runs_pk_field_name] - db_pk = get_old_pk_from_substitutions(dump_pk, pk_substitutions, TASK_RUNS_TABLE_NAME) - db_pk = db_pk or dump_pk - task_run: TaskRun = TaskRun.get(db, db_pk) - task_runs.append(task_run) - - # Export archived related data files to TaskRuns from dump - exported = _export_data_dir_for_task_runs( - input_dir_path=mephisto_data_files_path, - archive_file_path_without_ext=output_zip_file_base_name, - task_runs=task_runs, - pk_substitutions=pk_substitutions, - _format=_format, - verbosity=verbosity, - ) - - return exported + return backup_file_path def restore_from_backup( @@ -180,4 +46,4 @@ def restore_from_backup( Path(backup_file_path).unlink(missing_ok=True) except Exception as e: logger.exception(f"[red]Could not restore backup '{backup_file_path}'. Error: {e}[/red]") - raise + exit() diff --git a/mephisto/tools/db_data_porter/conflict_resolvers/__init__.py b/mephisto/tools/db_data_porter/conflict_resolvers/__init__.py index 86a1a7d02..96190d3af 100644 --- a/mephisto/tools/db_data_porter/conflict_resolvers/__init__.py +++ b/mephisto/tools/db_data_porter/conflict_resolvers/__init__.py @@ -21,8 +21,8 @@ attribute = getattr(module, attribute_name) if ( - isclass(attribute) and - issubclass(attribute, BaseMergeConflictResolver) and - attribute is not BaseMergeConflictResolver + isclass(attribute) + and issubclass(attribute, BaseMergeConflictResolver) + and attribute is not BaseMergeConflictResolver ): globals().update({attribute.__name__: attribute}) diff --git a/mephisto/tools/db_data_porter/conflict_resolvers/base_merge_conflict_resolver.py b/mephisto/tools/db_data_porter/conflict_resolvers/base_merge_conflict_resolver.py index da5dbb5e0..46c12ecaa 100644 --- a/mephisto/tools/db_data_porter/conflict_resolvers/base_merge_conflict_resolver.py +++ b/mephisto/tools/db_data_porter/conflict_resolvers/base_merge_conflict_resolver.py @@ -47,7 +47,10 @@ def __init__(self, db: "MephistoDB", provider_type: str): @staticmethod def _merge_rows_after_resolving( - table_pk_field_name: str, db_row: dict, dump_row: dict, resolved_row: dict, + table_pk_field_name: str, + db_row: dict, + dump_row: dict, + resolved_row: dict, ) -> dict: """ After we've resolved merging conflicts with rows fields, @@ -81,32 +84,37 @@ def _merge_rows_after_resolving( @staticmethod def _serialize_compared_fields_in_rows( - db_row: dict, dump_row: dict, compared_field_name: str, + db_row: dict, + dump_row: dict, + row_field_name: str, ) -> Tuple[dict, dict]: - db_value = db_row[compared_field_name] - dump_value = dump_row[compared_field_name] + db_value = db_row[row_field_name] + dump_value = dump_row[row_field_name] # Date fields - if compared_field_name.endswith("_at") or compared_field_name.endswith("_date"): - db_row[compared_field_name] = serialize_date_to_python(db_value) - dump_row[compared_field_name] = serialize_date_to_python(dump_value) + if row_field_name.endswith("_at") or row_field_name.endswith("_date"): + db_row[row_field_name] = serialize_date_to_python(db_value) + dump_row[row_field_name] = serialize_date_to_python(dump_value) # Numeric fields (integer or float) # Note: We cast both compared values to a numeric type # ONLY when one value is numeric, and another one is a string # (to avoid, for example, casting float to integer) for _type in [int, float]: - if ( - (isinstance(db_value, _type) and isinstance(dump_value, str)) or - (isinstance(db_value, str) and isinstance(dump_value, _type)) + if (isinstance(db_value, _type) and isinstance(dump_value, str)) or ( + isinstance(db_value, str) and isinstance(dump_value, _type) ): - db_row[compared_field_name] = _type(db_value) - dump_row[compared_field_name] = _type(dump_value) + db_row[row_field_name] = _type(db_value) + dump_row[row_field_name] = _type(dump_value) return db_row, dump_row def resolve( - self, table_name: str, table_pk_field_name: str, db_row: dict, dump_row: dict, + self, + table_name: str, + table_pk_field_name: str, + db_row: dict, + dump_row: dict, ) -> dict: """ Default logic of validating `strategies_config`, @@ -144,7 +152,10 @@ def resolve( # 4. Merge data merged_row = self._merge_rows_after_resolving( - table_pk_field_name, db_row, dump_row, resolved_row, + table_pk_field_name, + db_row, + dump_row, + resolved_row, ) # 4. Return merged row @@ -152,13 +163,18 @@ def resolve( # --- Prepared most cummon strategies --- def pick_row_with_smaller_value( - self, db_row: dict, dump_row: dict, compared_field_name: str, + self, + db_row: dict, + dump_row: dict, + row_field_name: str, ) -> dict: db_row, dump_row = self._serialize_compared_fields_in_rows( - db_row, dump_row, compared_field_name, + db_row, + dump_row, + row_field_name, ) - db_value = db_row[compared_field_name] - dump_value = dump_row[compared_field_name] + db_value = db_row[row_field_name] + dump_value = dump_row[row_field_name] # None cannot be compared with anything if db_value is None: @@ -172,13 +188,18 @@ def pick_row_with_smaller_value( return dump_row def pick_row_with_larger_value( - self, db_row: dict, dump_row: dict, compared_field_name: str, + self, + db_row: dict, + dump_row: dict, + row_field_name: str, ) -> dict: db_row, dump_row = self._serialize_compared_fields_in_rows( - db_row, dump_row, compared_field_name, + db_row, + dump_row, + row_field_name, ) - db_value = db_row[compared_field_name] - dump_value = dump_row[compared_field_name] + db_value = db_row[row_field_name] + dump_value = dump_row[row_field_name] # None cannot be compared with anything if db_value is None: @@ -192,23 +213,34 @@ def pick_row_with_larger_value( return dump_row def pick_row_from_db( - self, db_row: dict, dump_row: dict, compared_field_name: Optional[str] = None, + self, + db_row: dict, + dump_row: dict, + row_field_name: Optional[str] = None, ) -> dict: return db_row def pick_row_from_dump( - self, db_row: dict, dump_row: dict, compared_field_name: Optional[str] = None, + self, + db_row: dict, + dump_row: dict, + row_field_name: Optional[str] = None, ) -> dict: return dump_row def pick_row_with_earlier_value( - self, db_row: dict, dump_row: dict, compared_field_name: str = "creation_date", + self, + db_row: dict, + dump_row: dict, + row_field_name: str = "creation_date", ) -> dict: db_row, dump_row = self._serialize_compared_fields_in_rows( - db_row, dump_row, compared_field_name, + db_row, + dump_row, + row_field_name, ) - db_value = db_row[compared_field_name] - dump_value = dump_row[compared_field_name] + db_value = db_row[row_field_name] + dump_value = dump_row[row_field_name] # None cannot be compared with anything if db_value is None: @@ -221,13 +253,18 @@ def pick_row_with_earlier_value( return dump_row def pick_row_with_later_value( - self, db_row: dict, dump_row: dict, compared_field_name: str = "creation_date", + self, + db_row: dict, + dump_row: dict, + row_field_name: str = "creation_date", ) -> dict: db_row, dump_row = self._serialize_compared_fields_in_rows( - db_row, dump_row, compared_field_name, + db_row, + dump_row, + row_field_name, ) - db_value = db_row[compared_field_name] - dump_value = dump_row[compared_field_name] + db_value = db_row[row_field_name] + dump_value = dump_row[row_field_name] # None cannot be compared with anything if db_value is None: diff --git a/mephisto/tools/db_data_porter/conflict_resolvers/default_merge_conflict_resolver.py b/mephisto/tools/db_data_porter/conflict_resolvers/default_merge_conflict_resolver.py index 85a2bce68..ab7743913 100644 --- a/mephisto/tools/db_data_porter/conflict_resolvers/default_merge_conflict_resolver.py +++ b/mephisto/tools/db_data_porter/conflict_resolvers/default_merge_conflict_resolver.py @@ -26,7 +26,7 @@ class DefaultMergeConflictResolver(BaseMergeConflictResolver): # Go with more restrictive value "method": "pick_row_with_smaller_value", "kwargs": { - "compared_field_name": "value", + "row_field_name": "value", }, }, }, @@ -36,7 +36,7 @@ class DefaultMergeConflictResolver(BaseMergeConflictResolver): # Note that `is_blocked` is SQLite-boolean, which is an integer in Python "method": "pick_row_with_larger_value", "kwargs": { - "compared_field_name": "is_blocked", + "row_field_name": "is_blocked", }, }, }, diff --git a/mephisto/tools/db_data_porter/conflict_resolvers/example_merge_conflict_resolver.py b/mephisto/tools/db_data_porter/conflict_resolvers/example_merge_conflict_resolver.py new file mode 100644 index 000000000..6cf5c3ebb --- /dev/null +++ b/mephisto/tools/db_data_porter/conflict_resolvers/example_merge_conflict_resolver.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 + +# Copyright (c) Meta Platforms and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +from datetime import datetime +from typing import Optional + +from rich import print as rich_print + +from mephisto.tools.db_data_porter.constants import MEPHISTO_DUMP_KEY +from mephisto.tools.db_data_porter.constants import MOCK_PROVIDER_TYPE +from mephisto.tools.db_data_porter.constants import MTURK_PROVIDER_TYPE +from mephisto.tools.db_data_porter.constants import PROLIFIC_PROVIDER_TYPE +from .base_merge_conflict_resolver import BaseMergeConflictResolver + + +class ExampleMergeConflictResolver(BaseMergeConflictResolver): + """ + Example how to write your own conflict resolver. + + NOTE: do not accidentally use this example resolver on your real data. + """ + + default_strategy_name = "pick_row_from_db_and_set_creation_date_to_y2k" + + def pick_row_from_db_and_set_creation_date_to_y2k( + self, + db_row: dict, + dump_row: dict, + row_field_name: Optional[str] = None, + ) -> dict: + if "creation_date" in db_row: + db_row["creation_date"] = datetime(2000, 1, 1) + rich_print(f"\tSet `creation_date` to y2k for row {db_row}") + + return db_row + + def concatenate_values( + self, + db_row: dict, + dump_row: dict, + row_field_name: str, + separator: str, + ) -> dict: + resulting_row = self.pick_row_from_db_and_set_creation_date_to_y2k(db_row, dump_row) + + db_value = db_row[row_field_name] or "" + dump_value = dump_row[row_field_name] or "" + + if dump_value and db_value: + resulting_row[row_field_name] = db_value + separator + dump_value + rich_print(f"\tConcatenated `{row_field_name}` values for row {resulting_row}") + + return resulting_row + + strategies_config = { + MEPHISTO_DUMP_KEY: { + "tasks": { + # Concatenate names + "method": "concatenate_values", + "kwargs": { + "row_field_name": "task_name", + "separator": " + ", + }, + }, + }, + PROLIFIC_PROVIDER_TYPE: {}, + MOCK_PROVIDER_TYPE: {}, + MTURK_PROVIDER_TYPE: {}, + } diff --git a/mephisto/tools/db_data_porter/constants.py b/mephisto/tools/db_data_porter/constants.py index bea3047d4..ba40f7884 100644 --- a/mephisto/tools/db_data_porter/constants.py +++ b/mephisto/tools/db_data_porter/constants.py @@ -7,7 +7,7 @@ from mephisto.abstractions.providers.mock.provider_type import PROVIDER_TYPE as MOCK_PROVIDER_TYPE from mephisto.abstractions.providers.mturk.provider_type import PROVIDER_TYPE as MTURK_PROVIDER_TYPE from mephisto.abstractions.providers.prolific.provider_type import ( - PROVIDER_TYPE as PROLIFIC_PROVIDER_TYPE + PROVIDER_TYPE as PROLIFIC_PROVIDER_TYPE, ) @@ -213,3 +213,5 @@ # We mark rows in `imported_data` with labels and this label is used # if conflicted row was already presented in local DB LOCAL_DB_LABEL = "_" + +DEFAULT_ARCHIVE_FORMAT = "zip" diff --git a/mephisto/tools/db_data_porter/db_data_porter.py b/mephisto/tools/db_data_porter/db_data_porter.py index 237cfb235..bc916a820 100644 --- a/mephisto/tools/db_data_porter/db_data_porter.py +++ b/mephisto/tools/db_data_porter/db_data_porter.py @@ -6,19 +6,24 @@ import json import os +import zipfile from datetime import datetime from typing import Dict from typing import List from typing import Optional from typing import Union +from rich.console import Console + from mephisto.abstractions.database import MephistoDB from mephisto.abstractions.databases.local_database import LocalMephistoDB from mephisto.generators.form_composer.config_validation.utils import make_error_message from mephisto.tools.db_data_porter import backups +from mephisto.tools.db_data_porter import export_dump from mephisto.tools.db_data_porter import dumps from mephisto.tools.db_data_porter import import_dump from mephisto.tools.db_data_porter.constants import BACKUP_OUTPUT_DIR +from mephisto.tools.db_data_porter.constants import DEFAULT_ARCHIVE_FORMAT from mephisto.tools.db_data_porter.constants import EXPORT_OUTPUT_DIR from mephisto.tools.db_data_porter.constants import IMPORTED_DATA_TABLE_NAME from mephisto.tools.db_data_porter.constants import MEPHISTO_DUMP_KEY @@ -27,9 +32,9 @@ from mephisto.tools.db_data_porter.randomize_ids import randomize_ids from mephisto.tools.db_data_porter.validation import validate_dump_data from mephisto.utils import db as db_utils +from mephisto.utils.console_writer import ConsoleWriter from mephisto.utils.dirs import get_data_dir from mephisto.utils.misc import serialize_date_to_python -from mephisto.utils.console_writer import ConsoleWriter logger = ConsoleWriter() @@ -55,9 +60,9 @@ def __init__(self, db=None): @staticmethod def _get_root_mephisto_repo_dir() -> str: - return os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname( - os.path.abspath(__file__) - )))) + return os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + ) def _get_export_dir(self) -> str: root_dir = self._get_root_mephisto_repo_dir() @@ -114,6 +119,14 @@ def _prepare_dump_data( dump_data = randomize_ids_results["updated_dump"] self._pk_substitutions = randomize_ids_results["pk_substitutions"] + legacy_ids_found = any([v for v in self._pk_substitutions.values()]) + if not legacy_ids_found: + logger.info( + "Note that there was no need to randomize any ids, " + "because your Mephisto DB and provider-specific " + "datastores do not contain any legacy ids." + ) + return dump_data def _get_latest_migrations(self) -> Dict[str, Union[None, str]]: @@ -139,13 +152,14 @@ def _get_latest_migrations(self) -> Dict[str, Union[None, str]]: @staticmethod def _ask_user_if_they_are_sure() -> bool: - question = input( + console = Console() + question = console.input( "Are you sure? " "It will affect your databases and related files. " - "Type 'yes' and press Enter if you want to proceed: " + "Type '[green]yes[/green]' and press Enter if you want to proceed: " ) if question != "yes": - logger.info("Ok. Bye") + logger.info("Exiting now ...") exit() return True @@ -166,6 +180,7 @@ def export_dump( task_runs_labels: Optional[List[str]] = None, delete_exported_data: bool = False, randomize_legacy_ids: bool = False, + metadata_export_options: dict = None, verbosity: int = 0, ) -> dict: # 1. Protect from accidental launches @@ -173,15 +188,25 @@ def export_dump( self._ask_user_if_they_are_sure() # 2. Prepare dump data with Mephisto DB and provider datastores + logger.info(f"Started exporting data ...") + since_datetime = None if task_runs_since_date: try: since_datetime = serialize_date_to_python(task_runs_since_date) except Exception: - error_message = f"Could not parse date '{task_runs_since_date}'." - logger.exception(f"[red]{error_message}[/red]") + error_message = ( + f"Could not parse date '{task_runs_since_date}'. " + f"Expected ISO 8601 format in UTC timezone." + f"\n For example:" + f"\n\t - 2024-01-24" + f"\n\t - 2024-01-24T01:10:30" + ) + logger.error(f"[red]{error_message}[/red]") exit() + logger.info(f"Copying database records ...") + dump_data_to_export = self._prepare_dump_data( task_names=task_names, task_ids=task_ids, @@ -191,23 +216,27 @@ def export_dump( randomize_legacy_ids=randomize_legacy_ids, ) - # 3. Prepare export dirs and get dump file path + # 3. Prepare export dirs and get dump file path. + # JSON file is going to be located in tmp directory, + # where we add all related files and then archive them all together export_dir = self._get_export_dir() dump_timestamp = self._make_export_timestamp() dump_name = self._make_dump_name(dump_timestamp) - file_path = self._make_export_dump_file_path(export_dir, dump_name) + tmp_export_dir = export_dump.make_tmp_export_dir() + tmp_dump_json_file_path = self._make_export_dump_file_path(tmp_export_dir, dump_name) # 4. Prepare metadata metadata = { "migrations": self._get_latest_migrations(), - "export_parameters": { - "--export-indent": json_indent, - "--export-tasks-by-names": task_names, - "--export-tasks-by-ids": task_ids, - "--export-task-runs-by-ids": task_run_ids, - "--export-task-runs-since-date": task_runs_since_date, - "--verbosity": verbosity, - }, + "export_options": metadata_export_options, + # "export_options": { + # "--export-indent": json_indent, + # "--export-tasks-by-names": task_names, + # "--export-tasks-by-ids": task_ids, + # "--export-task-runs-by-ids": task_run_ids, + # "--export-task-runs-since-date": task_runs_since_date, + # "--verbosity": verbosity, + # }, "timestamp": dump_timestamp, "pk_substitutions": self._pk_substitutions, } @@ -215,19 +244,19 @@ def export_dump( # 5. Save JSON file try: - with open(file_path, "w") as f: + with open(tmp_dump_json_file_path, "w") as f: f.write(json.dumps(dump_data_to_export, indent=json_indent)) except Exception as e: # Remove file to not make a mess in export directory error_message = f"Could not create dump file {dump_data_to_export}. Reason: {str(e)}." - - if verbosity: - logger.exception(f"[red]{error_message}[/red]") - os.remove(file_path) + logger.exception(f"[red]{error_message}[/red]") + os.remove(tmp_dump_json_file_path) exit() + logger.info(f"Copying database records finished") + # 6. Archive files in file system - exported = backups.archive_and_copy_data_files( + exported = export_dump.archive_and_copy_data_files( self.db, export_dir, dump_name, @@ -239,8 +268,13 @@ def export_dump( # 7. Delete exported data if needed after backing data up backup_path = None if delete_exported_data: + logger.info( + f"Backing up your current data and removing exported data from local data ..." + ) + backup_dir = self._get_backup_dir() - backup_path = backups.make_full_data_dir_backup(backup_dir, dump_timestamp) + backup_path = backups.make_backup_file_path_by_timestamp(backup_dir, dump_timestamp) + backups.make_full_data_dir_backup(backup_path) delete_tasks = bool(task_names or task_ids) is_partial_dump = bool(task_names or task_ids or task_run_ids or task_runs_since_date) dumps.delete_exported_data( @@ -250,63 +284,77 @@ def export_dump( partial=is_partial_dump, delete_tasks=delete_tasks, ) + logger.info(f"Backing up of your current data and removing of exported data finished") data_path = None if exported: data_path = os.path.join( - export_dir, f"{dump_name}.{backups.DEFAULT_ARCHIVE_FORMAT}", + export_dir, + f"{dump_name}.{DEFAULT_ARCHIVE_FORMAT}", ) return { - "db_path": file_path, - "data_path": data_path, + "dump_path": data_path, "backup_path": backup_path, } def import_dump( self, - dump_file_name_or_path: str, + dump_archive_file_name_or_path: str, conflict_resolver_name: str, - label: Optional[str] = None, + labels: Optional[List[str]] = None, keep_import_metadata: Optional[bool] = None, verbosity: int = 0, ): # 1. Check dump file path - is_dump_path_full = os.path.isabs(dump_file_name_or_path) + if not dump_archive_file_name_or_path: + error_message = "Option `-f/--file` is required." + logger.error(f"[red]{error_message}[/red]") + exit() + + is_dump_path_full = os.path.isabs(dump_archive_file_name_or_path) if not is_dump_path_full: root_dir = self._get_root_mephisto_repo_dir() - dump_file_name_or_path = os.path.join( - root_dir, EXPORT_OUTPUT_DIR, dump_file_name_or_path, + dump_archive_file_name_or_path = os.path.join( + root_dir, + EXPORT_OUTPUT_DIR, + dump_archive_file_name_or_path, ) - if not os.path.exists(dump_file_name_or_path): + if not os.path.exists(dump_archive_file_name_or_path): error_message = ( - f"Could not find dump file '{dump_file_name_or_path}'. " - f"Please, specify full path to existing file or " - f"just file name that is located in `/{EXPORT_OUTPUT_DIR}`." + f"Could not find dump file '{dump_archive_file_name_or_path}'. " + f"Please specify full path to existing file or " + f"only filename if located in /{EXPORT_OUTPUT_DIR}." ) - if verbosity: - logger.exception(f"[red]{error_message}[/red]") + logger.error(f"[red]{error_message}[/red]") exit() - # 2. Read dump file - with open(dump_file_name_or_path, "r") as f: - try: - dump_file_data: dict = json.loads(f.read()) - except Exception as e: - error_message = ( - f"Could not read JSON from dump file '{dump_file_name_or_path}'. " - f"Please, check if it has the correct format. Reason: {str(e)}" - ) - logger.exception(f"[red]{error_message}[/red]") - exit() + # 2. Read JSON dump file from archive + with zipfile.ZipFile(dump_archive_file_name_or_path) as archive: + dump_name = os.path.basename(os.path.splitext(dump_archive_file_name_or_path)[0]) + json_dump_file_name = f"{dump_name}.json" + + with archive.open(json_dump_file_name) as f: + try: + dump_file_data: dict = json.loads(f.read()) + except Exception as e: + error_message = ( + f"Could not read JSON from dump file '{dump_archive_file_name_or_path}'. " + f"Please, check if file '{json_dump_file_name}' in it " + f"has the correct format. Reason: {str(e)}" + ) + logger.exception(f"[red]{error_message}[/red]") + exit() # 3. Validate dump dump_data_errors = validate_dump_data(self.db, dump_file_data) if dump_data_errors: error_message = make_error_message( - "Your dump file has incorrect format", dump_data_errors, indent=4, + "Your dump file has incorrect format", + dump_data_errors, + indent=4, ) logger.error(f"[red]{error_message}[/red]") exit() @@ -326,19 +374,28 @@ def import_dump( ) backup_dir = self._get_backup_dir() dump_timestamp = self._make_export_timestamp() - backup_path = backups.make_full_data_dir_backup(backup_dir, dump_timestamp) + backup_path = backups.make_backup_file_path_by_timestamp(backup_dir, dump_timestamp) + backups.make_full_data_dir_backup(backup_path) logger.info(f"Backup was created successfully! File: '{backup_path}'") # 7. Write dump data into local DBs + logger.info(f"Started importing from dump file {dump_archive_file_name_or_path} ...") + + imported_task_runs_number = 0 + for db_or_datastore_name, db_or_datastore_data in dump_file_data.items(): + # Pop `imported_data` from dump content, to merge it into local `imported_data` + # when option `--keep-import-metadata` is passed imported_data_from_dump = [] if db_or_datastore_name == MEPHISTO_DUMP_KEY: # Main Mephisto database db = self.db imported_data_from_dump = dump_file_data.get(MEPHISTO_DUMP_KEY, {}).pop( - IMPORTED_DATA_TABLE_NAME, [], + IMPORTED_DATA_TABLE_NAME, + [], ) + imported_task_runs_number = len(db_or_datastore_data.get("task_runs", [])) else: # Provider's datastore. # NOTE: It is being created if it does not exist (yes, here, magically) @@ -354,22 +411,22 @@ def import_dump( db = datastore if verbosity: - logger.info(f"Start importing into `{db_or_datastore_name}` database") + logger.debug(f"Start importing into `{db_or_datastore_name}` database") - label = label or self._get_label_from_file_path(dump_file_name_or_path) + labels = labels or [self._get_label_from_file_path(dump_archive_file_name_or_path)] import_single_db_results = import_dump.import_single_db( db=db, provider_type=db_or_datastore_name, dump_data=db_or_datastore_data, conflict_resolver_name=conflict_resolver_name, - label=label, + labels=labels, verbosity=verbosity, ) errors = import_single_db_results["errors"] if errors: - error_message = make_error_message("Import was not processed", errors, indent=4) + error_message = make_error_message("Nothing was imported", errors, indent=4) logger.error(f"[red]{error_message}[/red]") # Simulating rollback for all databases/datastores and related data files @@ -377,60 +434,89 @@ def import_dump( backup_path = backups.make_backup_file_path_by_timestamp(backup_dir, dump_timestamp) if verbosity: - logger.info(f"Rolling back all changed from backup '{backup_path}'") + logger.debug(f"Rolling back all changed from backup {backup_path} ...") backups.restore_from_backup(backup_path, mephisto_data_path) + if verbosity: + logger.debug(f"Rolling back finished") + exit() - # Write imformation in `imported_data` if db_or_datastore_name == MEPHISTO_DUMP_KEY: + # Unpack files related to the imported TaskRuns + dump_archive_file_path = ( + os.path.splitext(dump_archive_file_name_or_path)[0] + + f".{DEFAULT_ARCHIVE_FORMAT}" + ) + export_dump.unarchive_data_files(dump_archive_file_path, verbosity=verbosity) + + # Write imformation in `imported_data` # Fill `imported_data` table with imported dump import_dump.fill_imported_data_with_imported_dump( db=db, imported_data=import_single_db_results["imported_data"], - source_file_name=os.path.basename(dump_file_name_or_path), + source_file_name=os.path.basename(dump_archive_file_name_or_path), + verbosity=verbosity, ) # Fill `imported_data` with information from `imported_data` from dump if keep_import_metadata and imported_data_from_dump: - import_dump.import_table_imported_data_from_dump(db, imported_data_from_dump) + import_dump.import_table_imported_data_from_dump( + db, + imported_data_from_dump, + verbosity=verbosity, + ) if verbosity: - logger.info( + logger.debug( f"Finished importing into `{db_or_datastore_name}` database successfully!" ) - def make_backup(self) -> str: + return { + "imported_task_runs_number": imported_task_runs_number, + } + + def create_backup(self, verbosity: int = 0) -> str: backup_dir = self._get_backup_dir() dump_timestamp = self._make_export_timestamp() - backup_path = backups.make_full_data_dir_backup(backup_dir, dump_timestamp) + backup_path = backups.make_backup_file_path_by_timestamp(backup_dir, dump_timestamp) + + logger.info(f"Creating backup file ...") + + backups.make_full_data_dir_backup(backup_path) return backup_path def restore_from_backup(self, backup_file_name_or_path: str, verbosity: int = 0): - # 1. Protect from accidental launches - self._ask_user_if_they_are_sure() + # 1. Check backup file path + if not backup_file_name_or_path: + error_message = "Option `-f/--file` is required." + logger.error(f"[red]{error_message}[/red]") + exit() - # 2. Check backup file path is_backup_path_full = os.path.isabs(backup_file_name_or_path) if not is_backup_path_full: root_dir = self._get_root_mephisto_repo_dir() backup_file_name_or_path = os.path.join( - root_dir, BACKUP_OUTPUT_DIR, backup_file_name_or_path, + root_dir, + BACKUP_OUTPUT_DIR, + backup_file_name_or_path, ) if not os.path.exists(backup_file_name_or_path): error_message = ( - f"Could not find backup file '{backup_file_name_or_path}'. " - f"Please, specify full path to existing file or " - f"just file name that is located in `/{BACKUP_OUTPUT_DIR}`." + f"Could not find backup file {backup_file_name_or_path}. " + f"Please specify full path to existing file or " + f"only filename if located in /{BACKUP_OUTPUT_DIR}." ) - logger.exception(f"[red]{error_message}[/red]") + logger.error(f"[red]{error_message}[/red]") exit() - if verbosity and not is_backup_path_full: - logger.info(f"Found backup file '{backup_file_name_or_path}'") + # 2. Protect from accidental launches + self._ask_user_if_they_are_sure() # 3. Restore + logger.info(f"Started restoring from backup {backup_file_name_or_path} ...") + mephisto_data_path = get_data_dir() backups.restore_from_backup(backup_file_name_or_path, mephisto_data_path) diff --git a/mephisto/tools/db_data_porter/dumps.py b/mephisto/tools/db_data_porter/dumps.py index 34618ace0..dfb5952ba 100644 --- a/mephisto/tools/db_data_porter/dumps.py +++ b/mephisto/tools/db_data_porter/dumps.py @@ -26,6 +26,28 @@ logger = ConsoleWriter() +def _make_options_error_message( + title: str, + values: List[str], + not_found_values: List[str], + available_values: Optional[List[str]] = None, +) -> str: + available_values_string = "" + if available_values: + available_values_string = ( + f"\nThere are {len(available_values)} available values: {', '.join(available_values)}" + ) + + return ( + f"[red]" + f"You provided incorrect {title}. " + f"\nProvided {len(values)} values: {', '.join(values)}. " + f"\nNot found {len(not_found_values)} values: {', '.join(not_found_values)}." + f"{available_values_string}" + f"[/red]" + ) + + def prepare_partial_dump_data( db: "MephistoDB", task_names: Optional[List[str]] = None, @@ -42,19 +64,97 @@ def prepare_partial_dump_data( if not task_run_ids: if task_names or task_ids: if task_names: + # Validate on correct values of passed Task names + db_tasks = db_utils.select_rows_by_list_of_field_values( + db, + "tasks", + ["task_name"], + [task_names], + ) + if len(task_names) != len(db_tasks): + db_task_names = [t["task_name"] for t in db_tasks] + not_found_values = [t for t in task_names if t not in db_task_names] + logger.error( + _make_options_error_message("Task names", task_names, not_found_values) + ) + exit() + + # Get Task IDs by their names task_ids = db_utils.get_task_ids_by_task_names(db, task_names) + else: + # Validate on correct values of passed Task IDs + db_tasks = db_utils.select_rows_by_list_of_field_values( + db, + "tasks", + ["task_id"], + [task_ids], + ) + if len(task_ids) != len(db_tasks): + db_task_ids = [t["task_id"] for t in db_tasks] + not_found_values = [t for t in task_ids if t not in db_task_ids] + logger.error( + _make_options_error_message("Task IDs", task_ids, not_found_values) + ) + exit() + task_ids = task_ids or [] + + # Get TaskRun IDs by Task IDs task_run_ids = db_utils.get_task_run_ids_ids_by_task_ids(db, task_ids) elif task_runs_labels: + # Validate on correct values of passed TaskRun labels + db_labels = db_utils.get_list_of_available_labels(db) + not_found_values = [t for t in task_runs_labels if t not in db_labels] + if not_found_values: + logger.error( + _make_options_error_message( + "TaskRun labels", + task_runs_labels, + not_found_values, + db_labels, + ) + ) + exit() + + # Get TaskRun IDs task_run_ids = db_utils.get_task_run_ids_ids_by_labels(db, task_runs_labels) elif since_datetime: + # Get TaskRun IDs task_run_ids = db_utils.select_task_run_ids_since_date(db, since_datetime) - logger.info(f"Run command for TaskRun IDs: {', '.join(task_run_ids)}.") + if not task_run_ids: + logger.error( + f"Nothing to export - " + f"no TaskRuns found that were created after {since_datetime}" + ) + exit() + else: + # Validate on correct values of passed TaskRun IDs + db_task_runs = db_utils.select_rows_by_list_of_field_values( + db, + "task_runs", + ["task_run_id"], + [task_run_ids], + ) + if len(task_run_ids) != len(db_task_runs): + db_task_run_ids = [t["task_run_id"] for t in db_task_runs] + not_found_values = [t for t in task_run_ids if t not in db_task_run_ids] + logger.error(_make_options_error_message("TaskRun IDs", task_run_ids, not_found_values)) + exit() + + if task_run_ids: + logger.info(f"Run command for TaskRun IDs: {', '.join(task_run_ids)}.") + else: + logger.error("[yellow]Nothing to export - no TaskRuns found[/yellow]") + exit() mephisto_db_data = db_utils.mephisto_db_to_dict_for_task_runs(db, task_run_ids) dump_data_to_export[MEPHISTO_DUMP_KEY] = mephisto_db_data + if not mephisto_db_data.get("task_runs"): + logger.error("[yellow]Nothing to export - no TaskRuns found[/yellow]") + exit() + # Providers' DBs provider_types = [i["provider_type"] for i in mephisto_db_data["requesters"]] @@ -68,11 +168,14 @@ def prepare_partial_dump_data( # There is a provider-specific logic of exporting DB data as it can have any scheme. # It can be missed and not implemented at all datastore_export_method: MethodType = getattr( - provider_datastore, DATASTORE_EXPORT_METHOD_NAME, None, + provider_datastore, + DATASTORE_EXPORT_METHOD_NAME, + None, ) if datastore_export_method: datastore_export_data = datastore_export_method( - task_run_ids=task_run_ids, mephisto_db_data=mephisto_db_data, + task_run_ids=task_run_ids, + mephisto_db_data=mephisto_db_data, ) else: # If method was not implemented in provider datastore, we export all tables fully. @@ -93,7 +196,7 @@ def prepare_partial_dump_data( def prepare_full_dump_data(db: "MephistoDB", provider_datastores: Dict[str, "MephistoDB"]) -> dict: dump_data_to_export = {} - logger.info(f"Run command for all TaskRuns.") + logger.info(f"No filter for TaskRuns specified - exporting all TaskRuns.") # Mephisto DB dump_data_to_export[MEPHISTO_DUMP_KEY] = db_utils.db_or_datastore_to_dict(db) @@ -146,7 +249,8 @@ def delete_exported_data( # Get directories related to dumped TaskRuns task_run_rows = dump_data_to_export.get(MEPHISTO_DUMP_KEY, {}).get( - TASK_RUNS_TABLE_NAME, [], + TASK_RUNS_TABLE_NAME, + [], ) task_runs_pk_field_name = db_utils.get_table_pk_field_name(db, TASK_RUNS_TABLE_NAME) task_run_ids = [r[task_runs_pk_field_name] for r in task_run_rows] @@ -159,7 +263,9 @@ def delete_exported_data( # Clean DB db_utils.delete_exported_data_without_fk_constraints( - db, dump_data_to_export[MEPHISTO_DUMP_KEY], names_of_tables_to_cleanup, + db, + dump_data_to_export[MEPHISTO_DUMP_KEY], + names_of_tables_to_cleanup, ) # Clean related files diff --git a/mephisto/tools/db_data_porter/export_dump.py b/mephisto/tools/db_data_porter/export_dump.py new file mode 100644 index 000000000..98e030904 --- /dev/null +++ b/mephisto/tools/db_data_porter/export_dump.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 + +# Copyright (c) Meta Platforms and its affiliates. +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +import os +import shutil +from distutils.dir_util import copy_tree +from pathlib import Path +from typing import Any +from typing import Dict +from typing import List + +from rich_click import RichContext + +from mephisto.data_model.task_run import TaskRun +from mephisto.tools.db_data_porter.constants import AGENTS_TABLE_NAME +from mephisto.tools.db_data_porter.constants import ASSIGNMENTS_TABLE_NAME +from mephisto.tools.db_data_porter.constants import DEFAULT_ARCHIVE_FORMAT +from mephisto.tools.db_data_porter.constants import MEPHISTO_DUMP_KEY +from mephisto.tools.db_data_porter.constants import TASK_RUNS_TABLE_NAME +from mephisto.tools.db_data_porter.randomize_ids import get_old_pk_from_substitutions +from mephisto.utils import db as db_utils +from mephisto.utils.console_writer import ConsoleWriter +from mephisto.utils.dirs import get_data_dir +from mephisto.utils.dirs import get_mephisto_tmp_dir + +logger = ConsoleWriter() + + +def make_tmp_export_dir() -> str: + tmp_dir = get_mephisto_tmp_dir() + tmp_export_dir = os.path.join(tmp_dir, "export") + os.makedirs(tmp_export_dir, exist_ok=True) + return tmp_export_dir + + +def _rename_dirs_with_new_pks(task_run_dirs: List[str], pk_substitutions: dict): + def rename_dir_with_new_pk(dir_path: str, substitutions: dict) -> str: + dump_id = substitutions.get(os.path.basename(dir_path)) + renamed_dir_path = os.path.join(os.path.dirname(dir_path), dump_id) + os.rename(dir_path, renamed_dir_path) + return renamed_dir_path + + task_runs_subs = pk_substitutions.get(MEPHISTO_DUMP_KEY, {}).get(TASK_RUNS_TABLE_NAME, {}) + if not task_runs_subs: + # Nothing to rename + return + + assignment_subs = pk_substitutions.get(MEPHISTO_DUMP_KEY, {}).get(ASSIGNMENTS_TABLE_NAME, {}) + agent_subs = pk_substitutions.get(MEPHISTO_DUMP_KEY, {}).get(AGENTS_TABLE_NAME, {}) + + task_run_dirs = [d for d in task_run_dirs if os.path.basename(d) in task_runs_subs.keys()] + for task_run_dir in task_run_dirs: + # Rename TaskRun dir + renamed_task_run_dir = rename_dir_with_new_pk(task_run_dir, task_runs_subs) + + # Rename Assignments dirs + assignments_dirs = [ + os.path.join(renamed_task_run_dir, d) + for d in os.listdir(renamed_task_run_dir) + if d in assignment_subs.keys() + ] + for assignment_dir in assignments_dirs: + renamed_assignment_dir = rename_dir_with_new_pk(assignment_dir, assignment_subs) + + # Rename Agents dirs + agents_dirs = [ + os.path.join(renamed_assignment_dir, d) + for d in os.listdir(renamed_assignment_dir) + if d in agent_subs.keys() + ] + for agent_dir in agents_dirs: + rename_dir_with_new_pk(agent_dir, agent_subs) + + +def _export_data_dir_for_task_runs( + input_dir_path: str, + archive_file_path_without_ext: str, + task_runs: List[TaskRun], + pk_substitutions: dict, + _format: str = DEFAULT_ARCHIVE_FORMAT, + verbosity: int = 0, +) -> bool: + tmp_export_dir = make_tmp_export_dir() + + task_run_data_dirs = [i.get_run_dir() for i in task_runs] + if not task_run_data_dirs: + return False + + try: + tmp_task_run_dirs = [] + + # Copy all files for passed TaskRuns into tmp dir + for task_run_data_dir in task_run_data_dirs: + relative_dir = Path(task_run_data_dir).relative_to(input_dir_path) + tmp_task_run_dir = os.path.join(tmp_export_dir, relative_dir) + + tmp_task_run_dirs.append(tmp_task_run_dir) + + os.makedirs(tmp_task_run_dir, exist_ok=True) + copy_tree(task_run_data_dir, tmp_task_run_dir, verbose=0) + + _rename_dirs_with_new_pks(tmp_task_run_dirs, pk_substitutions) + + # Create archive in export dir + shutil.make_archive( + base_name=archive_file_path_without_ext, + format="zip", + root_dir=tmp_export_dir, + ) + finally: + # Remove tmp dir + if os.path.exists(tmp_export_dir): + shutil.rmtree(tmp_export_dir) + + return True + + +def archive_and_copy_data_files( + db: "MephistoDB", + export_dir: str, + dump_name: str, + dump_data: dict, + pk_substitutions: dict, + _format: str = DEFAULT_ARCHIVE_FORMAT, + verbosity: int = 0, +) -> bool: + mephisto_data_files_path = os.path.join(get_data_dir(), "data") + output_zip_file_base_name = os.path.join(export_dir, dump_name) # name without extension + + if verbosity: + logger.debug(f"Archiving data files started ...") + + # Get TaskRuns for PKs in dump + task_runs: List[TaskRun] = [] + task_runs_ids: List[str] = [] + for dump_task_run in dump_data[MEPHISTO_DUMP_KEY][TASK_RUNS_TABLE_NAME]: + task_runs_pk_field_name = db_utils.get_table_pk_field_name(db, TASK_RUNS_TABLE_NAME) + dump_pk = dump_task_run[task_runs_pk_field_name] + db_pk = get_old_pk_from_substitutions(dump_pk, pk_substitutions, TASK_RUNS_TABLE_NAME) + db_pk = db_pk or dump_pk + task_run: TaskRun = TaskRun.get(db, db_pk) + task_runs.append(task_run) + task_runs_ids.append(db_pk) + + if verbosity: + logger.debug(f"Archiving data files for TaskRuns: {', '.join(task_runs_ids)}") + + # Export archived related data files to TaskRuns from dump + exported = _export_data_dir_for_task_runs( + input_dir_path=mephisto_data_files_path, + archive_file_path_without_ext=output_zip_file_base_name, + task_runs=task_runs, + pk_substitutions=pk_substitutions, + _format=_format, + verbosity=verbosity, + ) + + if verbosity: + logger.debug(f"Archiving data files finished") + + return exported + + +def unarchive_data_files( + dump_file_path: str, + _format: str = DEFAULT_ARCHIVE_FORMAT, + verbosity: int = 0, +): + # Local directory with data files for TaskRuns + mephisto_data_files_path = os.path.join(get_data_dir(), "data") + mephisto_data_runs_path = os.path.join(mephisto_data_files_path, "runs") + + # Tmp directory where data files for TaskRuns will be unarchived from dump to + tmp_dir = get_mephisto_tmp_dir() + tmp_unarchive_dir = os.path.join(tmp_dir, "unarchive") + tmp_unarchive_task_runs_dir = os.path.join(tmp_unarchive_dir, "runs") + + try: + # Unarchive into tmp directory + if verbosity: + logger.debug("Unpacking TaskRuns files ...") + + shutil.unpack_archive( + filename=dump_file_path, + extract_dir=tmp_unarchive_dir, + format=_format, + ) + + if verbosity: + logger.debug("Unpacking TaskRuns files finished") + + # Copy files + if verbosity: + logger.debug("Copying TaskRuns files into {mephisto_data_runs_path} ...") + + copy_tree(tmp_unarchive_task_runs_dir, mephisto_data_runs_path, verbose=0) + + if verbosity: + logger.debug("Copying TaskRuns files finished") + except Exception as e: + logger.exception("Could not unpack TaskRuns files from dump") + exit() + finally: + # Remove tmp dir with dump data files + if verbosity: + logger.debug("Removing unpacked TaskRuns files ...") + + if os.path.exists(tmp_unarchive_dir): + shutil.rmtree(tmp_unarchive_dir) + + if verbosity: + logger.debug("Removing unpacked TaskRuns files finished") + + +def get_export_options_for_metadata(ctx: RichContext, options: dict) -> Dict[str, Any]: + export_options_for_metadata = {} + + for param in ctx.command.params: + option_name = "/".join(param.opts) # Concatenated option name variants (short/full) + values = options[p.name] + export_options_for_metadata[option_name] = values + + return export_options_for_metadata diff --git a/mephisto/tools/db_data_porter/import_dump.py b/mephisto/tools/db_data_porter/import_dump.py index 7390d928c..d2771f920 100644 --- a/mephisto/tools/db_data_porter/import_dump.py +++ b/mephisto/tools/db_data_porter/import_dump.py @@ -66,7 +66,7 @@ def import_single_db( provider_type: str, dump_data: dict, conflict_resolver_name: str, - label: str, + labels: List[str], verbosity: int = 0, ) -> ImportSingleDBsType: # Results of the function @@ -117,12 +117,11 @@ def import_single_db( # Imported data vars imported_data_needs_to_be_updated = ( - provider_type == MEPHISTO_DUMP_KEY and - table_name in IMPORTED_DATA_TABLE_NAMES + provider_type == MEPHISTO_DUMP_KEY and table_name in IMPORTED_DATA_TABLE_NAMES ) - newly_imported_labels = json.dumps([label]) - conflicted_labels = json.dumps([LOCAL_DB_LABEL, label]) + newly_imported_labels = json.dumps(sorted(labels)) + conflicted_labels = json.dumps(sorted([LOCAL_DB_LABEL, *labels])) imported_data_for_table = { newly_imported_labels: [], conflicted_labels: [], @@ -146,7 +145,10 @@ def import_single_db( imported_data_conflicted_row = False _update_row_with_pks_from_resolvings_mappings( - db, table_name, dump_row, resolvings_mapping, + db, + table_name, + dump_row, + resolvings_mapping, ) # Table with non-PK unique field @@ -169,7 +171,7 @@ def import_single_db( # If local DB does not have this row if not existing_rows: if verbosity: - logger.info(f"Inserting new row into table '{table_name}': {dump_row}") + logger.debug(f"Inserting new row into table '{table_name}': {dump_row}") db_utils.insert_new_row_in_table(db, table_name, dump_row) @@ -180,17 +182,30 @@ def import_single_db( existing_db_row = existing_rows[-1] if verbosity: - logger.info( + logger.debug( f"Conflicts during inserting row in table '{table_name}': " f"{dump_row}. " f"Existing row in your database: {existing_db_row}" ) resolved_conflicting_row = conflict_resolver_name.resolve( - table_name, table_pk_field_name, existing_db_row, dump_row, + table_name, + table_pk_field_name, + existing_db_row, + dump_row, ) + + if verbosity: + logger.debug( + f"Resolving finished successfully. " + f"Chosen row: {resolved_conflicting_row}" + ) + db_utils.update_row_in_table( - db, table_name, resolved_conflicting_row, table_pk_field_name, + db, + table_name, + resolved_conflicting_row, + table_pk_field_name, ) # Saving resolved a pair of PKs @@ -205,6 +220,9 @@ def import_single_db( # Regular table. Create new row as is else: + if verbosity: + logger.debug(f"Inserting new row into table '{table_name}': {dump_row}") + db_utils.insert_new_row_in_table(db, table_name, dump_row) # Update table lists of Imported data @@ -214,10 +232,12 @@ def import_single_db( else: _label = newly_imported_labels - imported_data_for_table[_label].append({ - UNIQUE_FIELD_NAMES: unique_field_names or [table_pk_field_name], - UNIQUE_FIELD_VALUES: imported_data_row_unique_field_values, - }) + imported_data_for_table[_label].append( + { + UNIQUE_FIELD_NAMES: unique_field_names or [table_pk_field_name], + UNIQUE_FIELD_VALUES: imported_data_row_unique_field_values, + } + ) # Add table into Imported data if imported_data_needs_to_be_updated: @@ -228,38 +248,45 @@ def import_single_db( if provider_type == MEPHISTO_DUMP_KEY: for unit_id, agent_id in units_agents.items(): db_utils.update_row_in_table( - db, "units", {"unit_id": unit_id, "agent_id": agent_id}, "unit_id", + db, + "units", + {"unit_id": unit_id, "agent_id": agent_id}, + "unit_id", ) # --- HACK (#UNIT.AGENT_ID) END #3: except Exception as e: + error_message_ending = "" + # Custom error message in cases when we can guess what happens # using small info SQLite gives us possible_issue = "" if in_progress_table_pk_field_name in str(e) and "UNIQUE constraint" in str(e): pk_value = in_progress_dump_row[in_progress_table_pk_field_name] + error_message_ending = ( + f". Local database already has Primary Key '{pk_value}' " + f"in table '{in_progress_table_name}'." + ) possible_issue = ( - f"\nPossible issue: " - f"Local database already have Primary Key '{pk_value}' " - f"in table '{in_progress_table_name}'. " - f"Maybe you are trying to run already merged dump file. " - f"Or if you have old databases, you may bump into same Primary Keys. " - f"If you are sure that all data from this dump is unique and " - f"still have access to the dumped project, " - f"try to create dump with parameter `--randomize-legacy-ids` " - f"and start importing again." + f"\n\n[bold]Possible issue:[/bold] " + f"You may be trying to import an already imported dump file. " + f"Or this could be related to legacy auto-increment database primary keys " + f"(in which case the dump needs to be re-created " + f"with -r/--randomize-legacy-ids option)." ) - default_error_message_beginning = "" + error_message_beginning = "" if not possible_issue: - default_error_message_beginning = "Unexpected error happened: " + error_message_beginning = "Unexpected error happened: " errors.append( - f"{default_error_message_beginning}{e}." + f"{error_message_beginning}{e}{error_message_ending}" f"{possible_issue}" - f"\nProvider: {provider_type}." - f"\nTable: {in_progress_table_name}." - f"\nRow: {json.dumps(in_progress_dump_row, indent=2)}." + f"\n" + f"\n[bold]Provider:[/bold] {provider_type}." + f"\n[bold]Table:[/bold] {in_progress_table_name}." + f"\n[bold]Row:[/bold]\n{json.dumps(in_progress_dump_row, indent=2)}." + f"\n" ) return { @@ -269,8 +296,15 @@ def import_single_db( def fill_imported_data_with_imported_dump( - db: "MephistoDB", imported_data: dict, source_file_name: str, + db: "MephistoDB", + imported_data: dict, + source_file_name: str, + verbosity: int = 0, ): + if verbosity: + if imported_data: + logger.debug("Saving information about imported data ...") + for table_name, table_info in imported_data.items(): for labels, labels_rows in table_info.items(): for row in labels_rows: @@ -291,8 +325,22 @@ def fill_imported_data_with_imported_dump( }, ) + if verbosity: + if imported_data: + logger.debug("Saving information about imported data finished") + + +def import_table_imported_data_from_dump( + db: "MephistoDB", + imported_data_rows: List[dict], + verbosity: int = 0, +): + if verbosity: + if imported_data_rows: + logger.debug( + "Updating local information about imported data with imported data from dump ..." + ) -def import_table_imported_data_from_dump(db: "MephistoDB", imported_data_rows: List[dict]): for row in imported_data_rows: table_name = row["table_name"] unique_field_names = row["unique_field_names"] @@ -316,13 +364,19 @@ def import_table_imported_data_from_dump(db: "MephistoDB", imported_data_rows: L # Update existing row if existing_row: + if verbosity: + logger.debug(f"Updating already existing row for `{table_name}`: {existing_row}") + # Merge existing labels with from imported row existing_data_labels = json.loads(existing_row["data_labels"]) existing_data_labels += importing_data_labels - existing_row["data_labels"] = json.dumps(list(set(existing_data_labels))) + existing_row["data_labels"] = json.dumps(sorted(list(set(existing_data_labels)))) db_utils.update_row_in_table( - db=db, table_name="imported_data", row=existing_row, pk_field_name="id", + db=db, + table_name="imported_data", + row=existing_row, + pk_field_name="id", ) # Create new row @@ -331,4 +385,14 @@ def import_table_imported_data_from_dump(db: "MephistoDB", imported_data_rows: L row.pop("id", None) row["data_labels"] = json.dumps(data_labels_without_local) + if verbosity: + logger.debug(f"Inserting new row for `{table_name}`: {row}") + db_utils.insert_new_row_in_table(db=db, table_name="imported_data", row=row) + + if verbosity: + if imported_data_rows: + logger.debug( + "Updating local information about imported data " + "with imported data from dump finished" + ) diff --git a/mephisto/tools/db_data_porter/randomize_ids.py b/mephisto/tools/db_data_porter/randomize_ids.py index ab81b75a9..fbb48fbca 100644 --- a/mephisto/tools/db_data_porter/randomize_ids.py +++ b/mephisto/tools/db_data_porter/randomize_ids.py @@ -30,7 +30,9 @@ class RandomizedIDsType(TypedDict): def _randomize_ids_for_mephisto( - db: "MephistoDB", mephisto_dump: dict, legacy_only: bool = False, + db: "MephistoDB", + mephisto_dump: dict, + legacy_only: bool = False, ) -> DBPKSubstitutionsType: table_names = [t for t in mephisto_dump.keys() if t not in [IMPORTED_DATA_TABLE_NAME]] @@ -152,7 +154,9 @@ def _randomize_ids_for_provider( def randomize_ids( - db: "MephistoDB", full_dump: dict, legacy_only: bool = False, + db: "MephistoDB", + full_dump: dict, + legacy_only: bool = False, ) -> RandomizedIDsType: pk_substitutions: PKSubstitutionsType = {} @@ -166,7 +170,9 @@ def randomize_ids( for provider_type in provider_types: provider_dump = full_dump[provider_type] randomized_ids_for_provider = _randomize_ids_for_provider( - provider_type, provider_dump, mephisto_pk_substitutions, + provider_type, + provider_dump, + mephisto_pk_substitutions, ) if randomized_ids_for_provider: @@ -179,7 +185,9 @@ def randomize_ids( def get_old_pk_from_substitutions( - pk: str, substitutions: dict, table_name: str, + pk: str, + substitutions: dict, + table_name: str, ) -> str: # After we created a dump file, we already can have new randomized PKs. # But we still have old ones in Mephisto DB. diff --git a/mephisto/tools/db_data_porter/validation.py b/mephisto/tools/db_data_porter/validation.py index 853ad749f..64f3c67ae 100644 --- a/mephisto/tools/db_data_porter/validation.py +++ b/mephisto/tools/db_data_porter/validation.py @@ -78,8 +78,8 @@ def validate_dump_data(db: "MephistoDB", dump_data: dict) -> Optional[List[str]] ) continue - incorrect_field_names = list(filter( - lambda fn: not isinstance(fn, str), table_row.keys()) + incorrect_field_names = list( + filter(lambda fn: not isinstance(fn, str), table_row.keys()) ) if incorrect_field_names: errors.append( diff --git a/mephisto/utils/db.py b/mephisto/utils/db.py index 43eb7f374..3828edbff 100644 --- a/mephisto/utils/db.py +++ b/mephisto/utils/db.py @@ -29,6 +29,7 @@ # --- Exceptions --- + class MephistoDBException(Exception): pass @@ -51,6 +52,7 @@ class EntryDoesNotExistException(MephistoDBException): # --- Functions --- + def _select_all_rows_from_table(db: "MephistoDB", table_name: str) -> List[dict]: with db.table_access_condition, db.get_connection() as conn: c = conn.cursor() @@ -60,13 +62,17 @@ def _select_all_rows_from_table(db: "MephistoDB", table_name: str) -> List[dict] def _select_rows_from_table_related_to_task( - db: "MephistoDB", table_name: str, task_ids: List[str], + db: "MephistoDB", + table_name: str, + task_ids: List[str], ) -> List[dict]: return select_rows_by_list_of_field_values(db, table_name, ["task_id"], [task_ids]) def select_rows_from_table_related_to_task_run( - db: "MephistoDB", table_name: str, task_run_ids: List[str], + db: "MephistoDB", + table_name: str, + task_run_ids: List[str], ) -> List[dict]: return select_rows_by_list_of_field_values(db, table_name, ["task_run_id"], [task_run_ids]) @@ -107,7 +113,7 @@ def get_task_ids_by_task_names(db: "MephistoDB", task_names: List[str]) -> List[ task_names_string = ",".join([f"'{s}'" for s in task_names]) c.execute( f""" - SELECT task_id FROM tasks + SELECT task_id FROM tasks WHERE task_name IN ({task_names_string}); """ ) @@ -121,7 +127,7 @@ def get_task_run_ids_ids_by_task_ids(db: "MephistoDB", task_ids: List[str]) -> L task_ids_string = ",".join([f"'{s}'" for s in task_ids]) c.execute( f""" - SELECT task_run_id FROM task_runs + SELECT task_run_id FROM task_runs WHERE task_id IN ({task_ids_string}); """ ) @@ -141,7 +147,7 @@ def get_task_run_ids_ids_by_labels(db: "MephistoDB", labels: List[str]) -> List[ c.execute( f""" - SELECT unique_field_values FROM imported_data + SELECT unique_field_values FROM imported_data WHERE table_name = 'task_runs' {where_labels_string}; """ ) @@ -163,9 +169,7 @@ def get_table_pk_field_name(db: "MephistoDB", table_name: str): """ with db.table_access_condition, db.get_connection() as conn: c = conn.cursor() - c.execute( - f"SELECT name FROM pragma_table_info('{table_name}') WHERE pk;" - ) + c.execute(f"SELECT name FROM pragma_table_info('{table_name}') WHERE pk;") table_unique_field_name = c.fetchone()["name"] return table_unique_field_name @@ -173,9 +177,7 @@ def get_table_pk_field_name(db: "MephistoDB", table_name: str): def select_all_table_rows(db: "MephistoDB", table_name: str) -> List[dict]: with db.table_access_condition, db.get_connection() as conn: c = conn.cursor() - c.execute( - f"SELECT * FROM {table_name};" - ) + c.execute(f"SELECT * FROM {table_name};") rows = c.fetchall() return [dict(row) for row in rows] @@ -207,10 +209,12 @@ def select_rows_by_list_of_field_values( _field_values = field_values[i] field_values_string = ",".join([f"'{s}'" for s in _field_values]) where_list.append([field_name, field_values_string]) - where_string = " AND ".join([ - f"{field_name} IN ({field_values_string})" - for field_name, field_values_string in where_list - ]) + where_string = " AND ".join( + [ + f"{field_name} IN ({field_values_string})" + for field_name, field_values_string in where_list + ] + ) # Combine ORDER BY statement order_by_string = "" @@ -221,7 +225,7 @@ def select_rows_by_list_of_field_values( c.execute( f""" - SELECT * FROM {table_name} + SELECT * FROM {table_name} WHERE {where_string} {order_by_string}; """ @@ -232,15 +236,15 @@ def select_rows_by_list_of_field_values( def delete_exported_data_without_fk_constraints( - db: "MephistoDB", db_dump: dict, table_names_can_be_cleaned: Optional[List[str]] = None, + db: "MephistoDB", + db_dump: dict, + table_names_can_be_cleaned: Optional[List[str]] = None, ): table_names_can_be_cleaned = table_names_can_be_cleaned or [] with db.table_access_condition, db.get_connection() as conn: c = conn.cursor() - c.execute( - "PRAGMA foreign_keys = off;" - ) + c.execute("PRAGMA foreign_keys = off;") delete_queries = [] for table_name, rows in db_dump.items(): @@ -255,9 +259,7 @@ def delete_exported_data_without_fk_constraints( ) c.executescript("\n".join(delete_queries)) - c.execute( - "PRAGMA foreign_keys = on;" - ) + c.execute("PRAGMA foreign_keys = on;") def delete_entire_exported_data(db: "MephistoDB"): @@ -268,9 +270,7 @@ def delete_entire_exported_data(db: "MephistoDB"): with db.table_access_condition, db.get_connection() as conn: c = conn.cursor() - c.execute( - "PRAGMA foreign_keys = off;" - ) + c.execute("PRAGMA foreign_keys = off;") delete_queries = [] for table_name in table_names: @@ -281,31 +281,29 @@ def delete_entire_exported_data(db: "MephistoDB"): c.executescript("\n".join(delete_queries)) - c.execute( - "PRAGMA foreign_keys = on;" - ) + c.execute("PRAGMA foreign_keys = on;") def get_list_of_provider_types(db: "MephistoDB") -> List[str]: with db.table_access_condition, db.get_connection() as conn: c = conn.cursor() - c.execute( - "SELECT provider_type FROM requesters;" - ) + c.execute("SELECT provider_type FROM requesters;") rows = c.fetchall() return [r["provider_type"] for r in rows] def get_latest_row_from_table( - db: "MephistoDB", table_name: str, order_by: Optional[str] = "creation_date", + db: "MephistoDB", + table_name: str, + order_by: Optional[str] = "creation_date", ) -> Optional[dict]: with db.table_access_condition, db.get_connection() as conn: c = conn.cursor() c.execute( f""" SELECT * - FROM {table_name} - ORDER BY {order_by} DESC + FROM {table_name} + ORDER BY {order_by} DESC LIMIT 1; """, ) @@ -371,8 +369,25 @@ def get_list_of_tables_to_export(db: "MephistoDB") -> List[str]: return filtered_table_names +def get_list_of_available_labels(db: "MephistoDB") -> List[str]: + with db.table_access_condition, db.get_connection() as conn: + c = conn.cursor() + c.execute("SELECT data_labels FROM imported_data;") + rows = c.fetchall() + + labels = [] + for row in rows: + row_labels: List[List[str]] = json.loads(row["data_labels"]) + labels += row_labels + + return list(set(labels)) + + def check_if_row_with_params_exists( - db: "MephistoDB", table_name: str, params: dict, select_field: Optional[str] = "*", + db: "MephistoDB", + table_name: str, + params: dict, + select_field: Optional[str] = "*", ) -> bool: """ Check if row exists in `table_name` for passed dict of `params` @@ -391,8 +406,8 @@ def check_if_row_with_params_exists( c.execute( f""" - SELECT {select_field} - FROM {table_name} {where_string} + SELECT {select_field} + FROM {table_name} {where_string} LIMIT 1; """, execute_args, @@ -462,36 +477,56 @@ def mephisto_db_to_dict_for_task_runs( # Find and serialize `projects` project_ids = list(set(filter(bool, [i["project_id"] for i in dump_data["tasks"]]))) project_rows = select_rows_by_list_of_field_values( - db, "projects", ["project_id"], [project_ids], + db, + "projects", + ["project_id"], + [project_ids], ) dump_data["projects"] = serialize_data_for_table(project_rows) # Find and serialize `requesters` requester_ids = list(set(filter(bool, [i["requester_id"] for i in dump_data["task_runs"]]))) requester_rows = select_rows_by_list_of_field_values( - db, "requesters", ["requester_id"], [requester_ids], + db, + "requesters", + ["requester_id"], + [requester_ids], ) dump_data["requesters"] = serialize_data_for_table(requester_rows) # Find and serialize `workers` worker_ids = list(set(filter(bool, [i["worker_id"] for i in dump_data["units"]]))) worker_rows = select_rows_by_list_of_field_values( - db, "workers", ["worker_id"], [worker_ids], + db, + "workers", + ["worker_id"], + [worker_ids], ) dump_data["workers"] = serialize_data_for_table(worker_rows) # Find and serialize `granted_qualifications` granted_qualification_rows = select_rows_by_list_of_field_values( - db, "granted_qualifications", ["worker_id"], [worker_ids], + db, + "granted_qualifications", + ["worker_id"], + [worker_ids], ) dump_data["granted_qualifications"] = serialize_data_for_table(granted_qualification_rows) # Find and serialize `qualifications` - qualification_ids = list(set(filter( - bool, [i["qualification_id"] for i in dump_data["granted_qualifications"]], - ))) + qualification_ids = list( + set( + filter( + bool, + [i["qualification_id"] for i in dump_data["granted_qualifications"]], + ) + ) + ) qualification_rows = select_rows_by_list_of_field_values( - db, "qualifications", ["qualification_id"], [qualification_ids], + db, + "qualifications", + ["qualification_id"], + [qualification_ids], ) dump_data["qualifications"] = serialize_data_for_table(qualification_rows) @@ -560,7 +595,10 @@ def insert_new_row_in_table(db: "MephistoDB", table_name: str, row: dict): def update_row_in_table( - db: "MephistoDB", table_name: str, row: dict, pk_field_name: Optional[str] = None, + db: "MephistoDB", + table_name: str, + row: dict, + pk_field_name: Optional[str] = None, ): row = deepcopy(row) @@ -588,6 +626,7 @@ def update_row_in_table( # --- Decorators --- + def retry_generate_id(caught_excs: Optional[List[Type[Exception]]] = None): """ A decorator that attempts to call create DB entry until ID will be unique. @@ -597,6 +636,7 @@ def retry_generate_id(caught_excs: Optional[List[Type[Exception]]] = None): - db - table_name """ + def decorator(unreliable_fn: Callable): def wrapped_fn(*args, **kwargs): caught_excs_tuple = tuple(caught_excs or [Exception]) @@ -614,16 +654,22 @@ def wrapped_fn(*args, **kwargs): # Othervise, we just leave error as is exc_message = str(getattr(e, "original_exc", None) or "") db = getattr(e, "db", None) - table_name = getattr(e, "table_name", None) - is_unique_constraint = exc_message.startswith("UNIQUE constraint") + table_name = getattr(e, "table_name", "") + pk_fieldname = get_table_pk_field_name(db, table_name=table_name) + is_pk_unique_constraint = ( + exc_message.startswith("UNIQUE constraint") + and f"{table_name}.{pk_fieldname}" in exc_message + ) - if db and table_name and is_unique_constraint: - pk_field_name = get_table_pk_field_name(db, table_name=table_name) - if pk_field_name in exc_message: - pk_exists = True + if db and table_name and is_pk_unique_constraint: + pk_exists = True + else: + # In case if we caught other unique constraint, reraise it + raise # Set original function name to wrapped one. wrapped_fn.__name__ = unreliable_fn.__name__ return wrapped_fn + return decorator diff --git a/mephisto/utils/misc.py b/mephisto/utils/misc.py index ca5b35f7a..df168fa8b 100644 --- a/mephisto/utils/misc.py +++ b/mephisto/utils/misc.py @@ -15,9 +15,7 @@ def serialize_date_to_python(value: Any) -> datetime: # If integer timestamp if isinstance(value, int): timestamp_is_in_msec = len(str(value)) == 13 - datetime_value = datetime.fromtimestamp( - value / 1000 if timestamp_is_in_msec else value - ) + datetime_value = datetime.fromtimestamp(value / 1000 if timestamp_is_in_msec else value) # If datetime string else: datetime_value = dateutil_parse(str(value)) diff --git a/mephisto/utils/testing.py b/mephisto/utils/testing.py index aa811be42..694ed3b20 100644 --- a/mephisto/utils/testing.py +++ b/mephisto/utils/testing.py @@ -222,7 +222,7 @@ def find_unit_reviews( SELECT * FROM unit_review WHERE (updated_qualification_id = ?1) OR - (revoked_qualification_id = ?1) AND + (revoked_qualification_id = ?1) AND (worker_id = ?2) {task_query} ORDER BY creation_date ASC; diff --git a/test/core/test_operator.py b/test/core/test_operator.py index 5a7078279..bf6a97da1 100644 --- a/test/core/test_operator.py +++ b/test/core/test_operator.py @@ -15,6 +15,7 @@ from unittest.mock import patch from tqdm import TMonitor # type: ignore +from mephisto.data_model.assignment import Assignment from mephisto.utils.testing import get_test_requester from mephisto.data_model.constants.assignment_state import AssignmentState from mephisto.abstractions.databases.local_database import LocalMephistoDB @@ -90,7 +91,7 @@ def tearDown(self): f"Expected only main thread at teardown after {SHUTDOWN_TIMEOUT} seconds, found {target_threads}", ) - def wait_for_complete_assignment(self, assignment, timeout: int): + def wait_for_complete_assignment(self, assignment: Assignment, timeout: int): start_time = time.time() while time.time() - start_time < timeout: if assignment.get_status() == AssignmentState.COMPLETED: diff --git a/test/review_app/server/api/test_units_view.py b/test/review_app/server/api/test_units_view.py index f9fc93d3b..3a4955388 100644 --- a/test/review_app/server/api/test_units_view.py +++ b/test/review_app/server/api/test_units_view.py @@ -76,14 +76,14 @@ def test_one_unit_with_unit_ids_success(self, *args, **kwargs): unit_1_id = get_test_unit(self.db) unit_1: Unit = Unit.get(self.db, unit_1_id) unit_2_id = self.db.new_unit( - unit_1.task_id, - unit_1.task_run_id, - unit_1.requester_id, - unit_1.db_id, - 2, - 1, - unit_1.provider_type, - unit_1.task_type, + task_id=unit_1.task_id, + task_run_id=unit_1.task_run_id, + requester_id=unit_1.requester_id, + assignment_id=unit_1.assignment_id, + unit_index=2, + pay_amount=1, + provider_type=unit_1.provider_type, + task_type=unit_1.task_type, ) unit_2: Unit = Unit.get(self.db, unit_2_id) unit_1.set_db_status(AssignmentState.COMPLETED) @@ -104,14 +104,14 @@ def test_two_units_with_unit_ids_success(self, *args, **kwargs): unit_1_id = get_test_unit(self.db) unit_1: Unit = Unit.get(self.db, unit_1_id) unit_2_id = self.db.new_unit( - unit_1.task_id, - unit_1.task_run_id, - unit_1.requester_id, - unit_1.db_id, - 2, - 1, - unit_1.provider_type, - unit_1.task_type, + task_id=unit_1.task_id, + task_run_id=unit_1.task_run_id, + requester_id=unit_1.requester_id, + assignment_id=unit_1.assignment_id, + unit_index=2, + pay_amount=1, + provider_type=unit_1.provider_type, + task_type=unit_1.task_type, ) unit_2: Unit = Unit.get(self.db, unit_2_id)