diff --git a/lean/commands/cloud/push.py b/lean/commands/cloud/push.py index 5b4c6c2e..67fce1dc 100644 --- a/lean/commands/cloud/push.py +++ b/lean/commands/cloud/push.py @@ -19,7 +19,6 @@ from lean.click import LeanCommand, PathParameter from lean.constants import PROJECT_CONFIG_FILE_NAME from lean.container import container -from lean.components.util.encryption_helper import get_project_key_hash from lean.models.encryption import ActionType @command(cls=LeanCommand) @@ -64,6 +63,7 @@ def push(project: Optional[Path], encrypt: Optional[bool], decrypt: Optional[boo raise RuntimeError(f"'{project}' is not a Lean project") if encrypt and key is not None: + from lean.components.util.encryption_helper import get_project_key_hash # lets check if the given key is registered with the cloud organization_id = container.organization_manager.try_get_working_organization_id() available_encryption_keys = container.api_client.encryption_keys.list(organization_id)['keys'] diff --git a/lean/commands/decrypt.py b/lean/commands/decrypt.py index 2dccf18f..29077603 100644 --- a/lean/commands/decrypt.py +++ b/lean/commands/decrypt.py @@ -17,7 +17,7 @@ from lean.click import LeanCommand, PathParameter from lean.container import container -from lean.components.util.encryption_helper import get_decrypted_file_content_for_project + @command(cls=LeanCommand) @argument("project", type=PathParameter(exists=True, file_okay=False, dir_okay=True)) @@ -38,24 +38,19 @@ def decrypt(project: Path, # Check if the project is already decrypted if not project_config.get("encrypted", False): + logger.info(f"Successfully decrypted project {project}") return decryption_key: Path = project_config.get('encryption-key-path', None) - if decryption_key is not None and Path(decryption_key).exists(): - decryption_key = Path(decryption_key) - - if decryption_key is None and key is None: - raise RuntimeError("No decryption key was provided, please provide one using --key") - elif decryption_key is None: - decryption_key = key - elif key is not None and decryption_key != key: - raise RuntimeError(f"Provided decryption key ({key}) does not match the decryption key in the project ({decryption_key})") + from lean.components.util.encryption_helper import get_and_validate_user_input_encryption_key + decryption_key = get_and_validate_user_input_encryption_key(key, decryption_key) organization_id = container.organization_manager.try_get_working_organization_id() source_files = project_manager.get_source_files(project) try: - decrypted_data = get_decrypted_file_content_for_project(project, + from lean.components.util.encryption_helper import get_decrypted_file_content_for_local_project + decrypted_data = get_decrypted_file_content_for_local_project(project, source_files, decryption_key, project_config_manager, organization_id) except Exception as e: raise RuntimeError(f"Could not decrypt project {project}: {e}") diff --git a/lean/commands/encrypt.py b/lean/commands/encrypt.py index d38d2f69..c9b12fc3 100644 --- a/lean/commands/encrypt.py +++ b/lean/commands/encrypt.py @@ -17,7 +17,6 @@ from lean.click import LeanCommand, PathParameter from lean.container import container -from lean.components.util.encryption_helper import get_encrypted_file_content_for_project @command(cls=LeanCommand) @@ -40,24 +39,19 @@ def encrypt(project: Path, # Check if the project is already encrypted if project_config.get('encrypted', False): + logger.info(f"Local files encrypted successfully.") return encryption_key: Path = project_config.get('encryption-key-path', None) - if encryption_key is not None and Path(encryption_key).exists(): - encryption_key = Path(encryption_key) - - if encryption_key is None and key is None: - raise RuntimeError("No encryption key was provided, please provide one using --key") - elif encryption_key is None: - encryption_key = key - elif key is not None and encryption_key != key: - raise RuntimeError(f"Provided encryption key ({key}) does not match the encryption key in the project ({encryption_key})") + from lean.components.util.encryption_helper import get_and_validate_user_input_encryption_key + encryption_key = get_and_validate_user_input_encryption_key(key, encryption_key) organization_id = container.organization_manager.try_get_working_organization_id() source_files = project_manager.get_source_files(project) try: - encrypted_data = get_encrypted_file_content_for_project(project, + from lean.components.util.encryption_helper import get_encrypted_file_content_for_local_project + encrypted_data = get_encrypted_file_content_for_local_project(project, source_files, encryption_key, project_config_manager, organization_id) except Exception as e: raise RuntimeError(f"Could not encrypt project {project}: {e}") diff --git a/lean/components/cloud/pull_manager.py b/lean/components/cloud/pull_manager.py index d78cd6ff..3855b0d8 100644 --- a/lean/components/cloud/pull_manager.py +++ b/lean/components/cloud/pull_manager.py @@ -25,8 +25,6 @@ from lean.models.utils import LeanLibraryReference from lean.models.encryption import ActionType from lean.components.config.storage import safe_save -from lean.components.util.encryption_helper import get_decrypted_content_from_cloud_project, \ - get_encrypted_content_from_cloud_project, get_project_key_hash class PullManager: """The PullManager class is responsible for synchronizing cloud projects to the local drive.""" @@ -182,10 +180,8 @@ def _pull_project(self, project: QCProject, encryption_action: Optional[ActionTy local_encryption_state = project_config.get("encrypted", False) # Handle mismatch cases - if not encryption_key and bool(project.encrypted) != bool(local_encryption_state): - raise RuntimeError(f"Project encryption state mismatch. Please provide encryption key to pull project '{project.name}'") - if getattr(project.encryptionKey, 'id', None) and encryption_key and get_project_key_hash(encryption_key) != project.encryptionKey.id: - raise RuntimeError(f"Encryption Key mismatch. Please provide correct encryption key to pull project '{project.name}'") + from lean.components.util.encryption_helper import validate_key_and_encryption_state_for_cloud_project + validate_key_and_encryption_state_for_cloud_project(project, local_encryption_state, encryption_key) # Pull the cloud files to the local drive self._pull_files(project, local_project_path, encryption_action, encryption_key) @@ -198,9 +194,10 @@ def _pull_project(self, project: QCProject, encryption_action: Optional[ActionTy project_config.set("description", project.description) project_config.set("organization-id", project.organizationId) project_config.set("python-venv", project.leanEnvironment) - project_config.set('encrypted', project.encrypted) if encryption_key: project_config.set("encrypted", encryption_action == ActionType.ENCRYPT) + else: + project_config.set('encrypted', project.encrypted) if not project.leanPinnedToMaster: project_config.set("lean-engine", project.leanVersionId) @@ -220,11 +217,9 @@ def _pull_files(self, project: QCProject, local_project_path: Path, encryption_a cloud_files = self._api_client.files.get_all(project.projectId) if encryption_key: + from lean.components.util.encryption_helper import get_appropriate_files_from_cloud_project organization_id = self._organization_manager.try_get_working_organization_id() - if encryption_action == ActionType.DECRYPT: - cloud_files = get_decrypted_content_from_cloud_project(project, cloud_files, encryption_key, organization_id) - if encryption_action == ActionType.ENCRYPT: - cloud_files = get_encrypted_content_from_cloud_project(project, cloud_files, encryption_key, organization_id) + cloud_files = get_appropriate_files_from_cloud_project(project, cloud_files, encryption_key, organization_id, encryption_action) for cloud_file in cloud_files: self._last_file = cloud_file.name diff --git a/lean/components/cloud/push_manager.py b/lean/components/cloud/push_manager.py index 9bc8d67e..ecd48686 100644 --- a/lean/components/cloud/push_manager.py +++ b/lean/components/cloud/push_manager.py @@ -21,8 +21,6 @@ from lean.components.util.project_manager import ProjectManager from lean.models.api import QCLanguage, QCProject from lean.models.utils import LeanLibraryReference -from lean.components.util.encryption_helper import get_encrypted_file_content_for_project,\ - get_project_key_hash, get_decrypted_file_content_for_project from lean.models.encryption import ActionType class PushManager: @@ -150,10 +148,8 @@ def _push_project(self, project_path: Path, organization_id: str, encryption_act self._logger.info(f"Successfully created cloud project '{cloud_project.name}'{organization_message_part}") # Handle mismatch cases - if not encryption_key and bool(cloud_project.encrypted) != bool(local_encryption_state): - raise RuntimeError(f"Project encryption state mismatch. Please provide encryption key to push the project.") - if getattr(cloud_project.encryptionKey, 'id', None) and encryption_key and get_project_key_hash(encryption_key) != cloud_project.encryptionKey.id: - raise RuntimeError(f"Project encryption key mismatch. Please provide correct encryption key to push the project.") + from lean.components.util.encryption_helper import validate_key_and_encryption_state_for_cloud_project + validate_key_and_encryption_state_for_cloud_project(cloud_project, local_encryption_state, encryption_key) # Finalize pushing by updating locally modified metadata, files and libraries self._push_metadata(project_path, cloud_project, encryption_action, encryption_key) @@ -164,14 +160,10 @@ def _get_files(self, project: Path, encryption_action: Optional[ActionType], enc :param project: the local project to push the files of """ paths = self._project_manager.get_source_files(project) - if (encryption_key): + if encryption_key: + from lean.components.util.encryption_helper import get_appropriate_files_from_local_project organization_id = self._organization_manager.try_get_working_organization_id() - if encryption_action == ActionType.ENCRYPT: - data = get_encrypted_file_content_for_project(project, paths, - encryption_key, self._project_config_manager, organization_id) - else: - data = get_decrypted_file_content_for_project(project, - paths, encryption_key, self._project_config_manager, organization_id) + data = get_appropriate_files_from_local_project(project, paths, encryption_key, self._project_config_manager, organization_id, encryption_action) files = [ { 'name': path.relative_to(project).as_posix(), @@ -243,6 +235,7 @@ def _push_metadata(self, project: Path, cloud_project: QCProject, encryption_act if (encryption_action is not None and encryption_key is not None): if encryption_action == ActionType.ENCRYPT: + from lean.components.util.encryption_helper import get_project_key_hash encryption_key_id = get_project_key_hash(encryption_key) update_args["encryption_key"] = encryption_key_id else: diff --git a/lean/components/util/encryption_helper.py b/lean/components/util/encryption_helper.py index 648cedea..01ac20b5 100644 --- a/lean/components/util/encryption_helper.py +++ b/lean/components/util/encryption_helper.py @@ -11,7 +11,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import re from typing import List from pathlib import Path from base64 import b64decode, b64encode @@ -19,10 +18,10 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes -from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC - +from lean.models.encryption import ActionType from lean.components.config.project_config_manager import ProjectConfigManager from lean.models.api import QCProject, QCFullFile +from lean.components.config.storage import Storage def calculate_md5(input_string: str): """Calculate the md5 hash of a string @@ -30,8 +29,8 @@ def calculate_md5(input_string: str): :param input_string: The string to hash :return: The md5 hash of the string """ - import hashlib - return hashlib.md5(input_string.encode()).hexdigest() + from hashlib import md5 + return md5(input_string.encode()).hexdigest() def get_b64_encoded(key: str) -> bytes: """Encode a string to base64 @@ -71,16 +70,14 @@ def get_project_iv(project_key_path: Path): key_id = get_project_key_hash(project_key_path) return key_id[:16] -def get_decrypted_file_content_for_project(project: Path, source_files: List[Path], encryption_key: Path, project_config_manager: ProjectConfigManager, organization_id: str) -> List[str]: +def get_decrypted_file_content_for_local_project(project: Path, source_files: List[Path], encryption_key: Path, project_config_manager: ProjectConfigManager, organization_id: str) -> List[str]: project_config = project_config_manager.get_project_config(project) # Check if the project is already encrypted areProjectFilesAlreadyEncrypted = project_config.get('encrypted', False) # Check if there is mismatch of keys - local_encryption_key_path = project_config.get('encryption-key-path', None) - if local_encryption_key_path and encryption_key and Path(local_encryption_key_path) != encryption_key: - raise RuntimeError(f"Registered encryption key {local_encryption_key_path} is different from the one provided {encryption_key}") + _validate_key_state_for_local_project(project_config, encryption_key) project_key = get_project_key(encryption_key, organization_id) project_iv = get_project_iv(encryption_key) @@ -93,39 +90,20 @@ def get_decrypted_file_content_for_project(project: Path, source_files: List[Pat if not areProjectFilesAlreadyEncrypted: decrypted = encrypted else: - decrypted = decrypt_file_content(get_b64_encoded(project_key), get_b64_encoded(project_iv), encrypted) + decrypted = _decrypt_file_content(get_b64_encoded(project_key), get_b64_encoded(project_iv), encrypted) decrypted_data.append(decrypted) except Exception as e: raise RuntimeError(f"Failed to decrypt file {file} with error {e}") return decrypted_data -def decrypt_file_content(b64_encoded_key: bytes, b64_encoded_iv: bytes, b64_encoded_content: str) -> str: - # remove new line characters that we added during encryption - b64_encoded_content = b64_encoded_content.replace('\n', '') - b64_encoded_content = b64_encoded_content.strip() - content = b64decode(b64_encoded_content.encode('utf-8')) - key = b64decode(b64_encoded_key) - init_vector = b64decode(b64_encoded_iv) - # Setup module-specific classes - cipher = Cipher(algorithms.AES(key), modes.CBC(init_vector)) - decryptor = cipher.decryptor() - - plaintext = decryptor.update(content) + decryptor.finalize() - # Unpad the decrypted data - unpadder = padding.PKCS7(128).unpadder() - unpadded_data = unpadder.update(plaintext) + unpadder.finalize() - return unpadded_data.decode('utf-8').replace("\r\n", "\n") - -def get_encrypted_file_content_for_project(project: Path, source_files: List[Path], encryption_key: Path, project_config_manager: ProjectConfigManager, organization_id: str) -> List[str]: +def get_encrypted_file_content_for_local_project(project: Path, source_files: List[Path], encryption_key: Path, project_config_manager: ProjectConfigManager, organization_id: str) -> List[str]: project_config = project_config_manager.get_project_config(project) # Check if the project is already encrypted areProjectFilesAlreadyEncrypted = project_config.get('encrypted', False) # Check if there is mismatch of keys - local_encryption_key_path = project_config.get('encryption-key-path', None) - if local_encryption_key_path and encryption_key and Path(local_encryption_key_path) != encryption_key: - raise RuntimeError(f"Registered encryption key {local_encryption_key_path} is different from the one provided {encryption_key}") + _validate_key_state_for_local_project(project_config, encryption_key) project_key = get_project_key(encryption_key, organization_id) project_iv = get_project_iv(encryption_key) @@ -138,13 +116,63 @@ def get_encrypted_file_content_for_project(project: Path, source_files: List[Pat if areProjectFilesAlreadyEncrypted: encrypted = plain_text else: - encrypted = encrypt_file_content(get_b64_encoded(project_key), get_b64_encoded(project_iv), plain_text) + encrypted = _encrypt_file_content(get_b64_encoded(project_key), get_b64_encoded(project_iv), plain_text) encrypted_data.append(encrypted) except Exception as e: raise RuntimeError(f"Failed to encrypt file {file} with error {e}") return encrypted_data -def encrypt_file_content(b64_encoded_key: bytes, b64_encoded_iv: bytes, content: bytes) -> str: +def get_and_validate_user_input_encryption_key(user_input_key: Path | None, project_config_encryption_key: Path | None) -> str: + if project_config_encryption_key is not None and Path(project_config_encryption_key).exists(): + project_config_encryption_key = Path(project_config_encryption_key) + if project_config_encryption_key is None and user_input_key is None: + raise RuntimeError("No encryption key was provided, please provide one using --key") + elif project_config_encryption_key is None: + project_config_encryption_key = user_input_key + elif user_input_key is not None and project_config_encryption_key != user_input_key: + raise RuntimeError(f"Provided encryption key ({user_input_key}) does not match the encryption key in the project ({project_config_encryption_key})") + return project_config_encryption_key + +def validate_key_and_encryption_state_for_cloud_project(project: QCProject, local_project_encryption_state: bool, encryption_key: Path) -> None: + if not encryption_key and bool(project.encrypted) != bool(local_project_encryption_state): + raise RuntimeError(f"Project encryption state mismatch. Local Project Encrypted: {bool(local_project_encryption_state)}. Cloud Project Encrypted {bool(project.encrypted)}. Please provide encryption key to pull project '{project.name}'") + if project.encryptionKey and encryption_key and get_project_key_hash(encryption_key) != project.encryptionKey.id: + raise RuntimeError(f"Encryption Key mismatch. Local Project Key hash: {get_project_key_hash(encryption_key)}. Cloud Project Key Hash {project.encryptionKey.id}. Please provide correct encryption key to pull project '{project.name}'") + +def get_appropriate_files_from_cloud_project(project: QCProject, cloud_files: List[QCFullFile], encryption_key: Path, organization_id: str, encryption_action: ActionType) -> List[QCFullFile]: + if encryption_action == ActionType.DECRYPT: + return _get_decrypted_content_from_cloud_project(project, cloud_files, encryption_key, organization_id) + return _get_encrypted_content_from_cloud_project(project, cloud_files, encryption_key, organization_id) + +def get_appropriate_files_from_local_project(project: Path, paths: List[Path], encryption_key: Path, project_config_manager: ProjectConfigManager, organization_id: str, encryption_action: ActionType) -> List[str]: + if encryption_action == ActionType.ENCRYPT: + return get_encrypted_file_content_for_local_project(project, paths, encryption_key, project_config_manager, organization_id) + return get_decrypted_file_content_for_local_project(project, paths, encryption_key,project_config_manager, organization_id) + + +def _validate_key_state_for_local_project(project_config: Storage, encryption_key: Path): + local_encryption_key_path = project_config.get('encryption-key-path', None) + if local_encryption_key_path and encryption_key and Path(local_encryption_key_path) != encryption_key: + raise RuntimeError(f"Registered encryption key {local_encryption_key_path} is different from the one provided {encryption_key}") + +def _decrypt_file_content(b64_encoded_key: bytes, b64_encoded_iv: bytes, b64_encoded_content: str) -> str: + # remove new line characters that we added during encryption + b64_encoded_content = b64_encoded_content.replace('\n', '') + b64_encoded_content = b64_encoded_content.strip() + content = b64decode(b64_encoded_content.encode('utf-8')) + key = b64decode(b64_encoded_key) + init_vector = b64decode(b64_encoded_iv) + # Setup module-specific classes + cipher = Cipher(algorithms.AES(key), modes.CBC(init_vector)) + decryptor = cipher.decryptor() + + plaintext = decryptor.update(content) + decryptor.finalize() + # Unpad the decrypted data + unpadder = padding.PKCS7(128).unpadder() + unpadded_data = unpadder.update(plaintext) + unpadder.finalize() + return unpadded_data.decode('utf-8').replace("\r\n", "\n") + +def _encrypt_file_content(b64_encoded_key: bytes, b64_encoded_iv: bytes, content: bytes) -> str: key = b64decode(b64_encoded_key) init_vector = b64decode(b64_encoded_iv) plain_text = _pad(content, 16) @@ -157,10 +185,11 @@ def encrypt_file_content(b64_encoded_key: bytes, b64_encoded_iv: bytes, content: encrypted_content = b64encode(cipher_text).decode('utf-8') # let's make it user friendly and add new lines, same as local platform regex = r".{1,80}" - chunks = re.findall(regex, encrypted_content) + from re import findall + chunks = findall(regex, encrypted_content) return '\n'.join(chunks) -def get_decrypted_content_from_cloud_project(project: QCProject, cloud_files: List[QCFullFile], encryption_key: Path, organization_id: str) -> List[QCFullFile]: +def _get_decrypted_content_from_cloud_project(project: QCProject, cloud_files: List[QCFullFile], encryption_key: Path, organization_id: str) -> List[QCFullFile]: # Check if the project is already encrypted if not project.encrypted or project.encryptionKey.id != get_project_key_hash(encryption_key): return cloud_files @@ -169,13 +198,13 @@ def get_decrypted_content_from_cloud_project(project: QCProject, cloud_files: Li project_iv = get_project_iv(encryption_key) for cloud_file in cloud_files: try: - decrypted = decrypt_file_content(get_b64_encoded(project_key), get_b64_encoded(project_iv), cloud_file.content) + decrypted = _decrypt_file_content(get_b64_encoded(project_key), get_b64_encoded(project_iv), cloud_file.content) cloud_file.content = decrypted except Exception as e: raise RuntimeError(f"Failed to decrypt file {cloud_file} with error {e}") return cloud_files -def get_encrypted_content_from_cloud_project(project: QCProject, cloud_files: List[QCFullFile], encryption_key: Path, organization_id: str) -> List[QCFullFile]: +def _get_encrypted_content_from_cloud_project(project: QCProject, cloud_files: List[QCFullFile], encryption_key: Path, organization_id: str) -> List[QCFullFile]: # Check if the project is already encrypted if project.encrypted: if encryption_key is not None and project.encryptionKey and project.encryptionKey.id != get_project_key_hash(encryption_key): @@ -186,7 +215,7 @@ def get_encrypted_content_from_cloud_project(project: QCProject, cloud_files: Li project_iv = get_project_iv(encryption_key) for cloud_file in cloud_files: try: - encrypted = encrypt_file_content(get_b64_encoded(project_key), get_b64_encoded(project_iv), cloud_file.content.encode('utf-8')) + encrypted = _encrypt_file_content(get_b64_encoded(project_key), get_b64_encoded(project_iv), cloud_file.content.encode('utf-8')) cloud_file.content = encrypted except Exception as e: raise RuntimeError(f"Failed to decrypt file {cloud_file} with error {e}") @@ -203,6 +232,7 @@ def _pad(data, block_size): return data + bytes([padding_length] * padding_length) def _get_fixed_length_key_from_user_full_length_key(password: str, salt: bytes): + from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=16, @@ -212,3 +242,4 @@ def _get_fixed_length_key_from_user_full_length_key(password: str, salt: bytes): ) return kdf.derive(password.encode()).hex() + diff --git a/tests/commands/cloud/test_push.py b/tests/commands/cloud/test_push.py index 23d272bf..5c126e45 100644 --- a/tests/commands/cloud/test_push.py +++ b/tests/commands/cloud/test_push.py @@ -116,7 +116,7 @@ def test_cloud_push_aborts_when_given_directory_does_not_exist() -> None: def test_cloud_push_updates_lean_config() -> None: create_fake_lean_cli_directory() - + project_path = Path.cwd() / "Python Project" cloud_project = create_api_project(1, "Python Project") api_client = mock.Mock() api_client.projects.create = mock.MagicMock(return_value=cloud_project) @@ -127,25 +127,18 @@ def test_cloud_push_updates_lean_config() -> None: api_client.projects.get_all = mock.MagicMock(return_value=[cloud_project]) api_client.projects.get = mock.MagicMock(return_value=create_api_project(1, "Python Project")) - project_config = mock.Mock() - project_config.get = mock.MagicMock(side_effect=[None, "Python", "", {}, -1, None, []]) - - project_config_manager = mock.Mock() - project_config_manager.get_project_config = mock.MagicMock(return_value=project_config) - - project_manager = mock.Mock() - project_manager.get_source_files = mock.MagicMock(return_value=[]) - project_manager.get_project_libraries = mock.MagicMock(return_value=[]) - - push_manager = PushManager(mock.Mock(), api_client, project_manager, project_config_manager, mock.Mock()) + init_container(api_client_to_use=api_client) - init_container(push_manager_to_use=push_manager, api_client_to_use=api_client) + project_config = container.project_config_manager.get_project_config(project_path) + assert project_config.get("organization-id", None) == None + assert cloud_project.organizationId == "123" result = CliRunner().invoke(lean, ["cloud", "push", "--project", "Python Project"]) assert result.exit_code == 0 - project_config.set.assert_called_with("organization-id", "123") + project_config = container.project_config_manager.get_project_config(project_path) + assert project_config.get("organization-id", None) == "123" def test_cloud_push_aborts_when_encrypting_without_key_given() -> None: