diff --git a/components/clp-py-utils/clp_py_utils/s3_utils.py b/components/clp-py-utils/clp_py_utils/s3_utils.py index f35ea9233..288dbcc8f 100644 --- a/components/clp-py-utils/clp_py_utils/s3_utils.py +++ b/components/clp-py-utils/clp_py_utils/s3_utils.py @@ -1,9 +1,10 @@ import re from pathlib import Path -from typing import List, Tuple +from typing import List, Optional, Tuple import boto3 from botocore.config import Config +from botocore.credentials import ReadOnlyCredentials from job_orchestration.scheduler.job_config import S3InputConfig from clp_py_utils.clp_config import S3Config @@ -61,6 +62,15 @@ def generate_s3_virtual_hosted_style_url( return f"https://{bucket_name}.s3.{region_code}.{AWS_ENDPOINT}/{object_key}" +def s3_get_frozen_credentials() -> Optional[ReadOnlyCredentials]: + session = boto3.Session() + credentials = session.get_credentials() + if credentials is None: + return None + frozen_credentials = credentials.get_frozen_credentials() + return frozen_credentials + + def s3_get_object_metadata(s3_input_config: S3InputConfig) -> List[FileMetadata]: """ Gets the metadata of all objects under the / specified by s3_input_config. diff --git a/components/job-orchestration/job_orchestration/executor/compress/compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/compression_task.py index c17b0fd1a..23713211b 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/compression_task.py @@ -23,6 +23,7 @@ from clp_py_utils.s3_utils import generate_s3_virtual_hosted_style_url, s3_put from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.compress.celery import app +from job_orchestration.executor.utils import load_session_credentials from job_orchestration.scheduler.constants import CompressionTaskStatus from job_orchestration.scheduler.job_config import ( ClpIoConfig, @@ -162,7 +163,7 @@ def make_clp_s_command_and_env( clp_config: ClpIoConfig, db_config_file_path: pathlib.Path, use_single_file_archive: bool, -) -> Tuple[List[str], Optional[Dict[str, str]]]: +) -> Tuple[Optional[List[str]], Optional[Dict[str, str]]]: """ Generates the command and environment variables for a clp_s compression job. :param clp_home: @@ -185,10 +186,17 @@ def make_clp_s_command_and_env( # fmt: on if InputType.S3 == clp_config.input.type: + aws_access_key_id = clp_config.input.aws_access_key_id + aws_secret_access_key = clp_config.input.aws_secret_access_key + if aws_access_key_id is None or aws_secret_access_key is None: + aws_access_key_id, aws_access_key_id = load_session_credentials(logger) + if aws_access_key_id is None or aws_secret_access_key is None: + return None, None + compression_env_vars = { **os.environ, - "AWS_ACCESS_KEY_ID": clp_config.input.aws_access_key_id, - "AWS_SECRET_ACCESS_KEY": clp_config.input.aws_secret_access_key, + "AWS_ACCESS_KEY_ID": aws_access_key_id, + "AWS_SECRET_ACCESS_KEY": aws_secret_access_key, } compression_cmd.append("--auth") compression_cmd.append("s3") @@ -276,6 +284,11 @@ def run_clp( logger.error(f"Unsupported storage engine {clp_storage_engine}") return False, {"error_message": f"Unsupported storage engine {clp_storage_engine}"} + if compression_cmd is None: + error_msg = "Error creating compression command" + logger.error(error_msg) + return False, {"error_message": error_msg} + # Generate list of logs to compress input_type = clp_config.input.type logs_list_path = data_dir / f"{instance_id_str}-log-paths.txt" diff --git a/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py b/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py index f80477a48..f4af7424f 100644 --- a/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py @@ -8,14 +8,18 @@ from celery.utils.log import get_task_logger from clp_py_utils.clp_config import Database, S3Config, StorageEngine, StorageType, WorkerConfig from clp_py_utils.clp_logging import set_logging_level -from clp_py_utils.s3_utils import generate_s3_virtual_hosted_style_url, s3_put +from clp_py_utils.s3_utils import ( + generate_s3_virtual_hosted_style_url, + get_frozen_credentials, + s3_put, +) from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.query.celery import app from job_orchestration.executor.query.utils import ( report_task_failure, run_query_task, ) -from job_orchestration.executor.utils import load_worker_config +from job_orchestration.executor.utils import load_session_credentials, load_worker_config from job_orchestration.scheduler.job_config import ExtractIrJobConfig, ExtractJsonJobConfig from job_orchestration.scheduler.scheduler_data import QueryTaskStatus @@ -103,8 +107,10 @@ def _make_clp_s_command_and_env_vars( # fmt: on aws_access_key_id, aws_secret_access_key = s3_config.get_credentials() if aws_access_key_id is None or aws_secret_access_key is None: - logger.error("Missing credentials for accessing archives on S3") - return None, None + aws_access_key_id, aws_access_key_id = load_session_credentials(logger) + if aws_access_key_id is None or aws_secret_access_key is None: + return None, None + env_vars = { **os.environ, "AWS_ACCESS_KEY_ID": aws_access_key_id, diff --git a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py index e01d01a4e..4b7f89d04 100644 --- a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py @@ -14,7 +14,7 @@ report_task_failure, run_query_task, ) -from job_orchestration.executor.utils import load_worker_config +from job_orchestration.executor.utils import load_session_credentials, load_worker_config from job_orchestration.scheduler.job_config import SearchJobConfig # Setup logging @@ -73,8 +73,10 @@ def _make_core_clp_s_command_and_env_vars( # fmt: on aws_access_key_id, aws_secret_access_key = s3_config.get_credentials() if aws_access_key_id is None or aws_secret_access_key is None: - logger.error("Missing credentials for accessing archives on S3") - return None, None + aws_access_key_id, aws_access_key_id = load_session_credentials(logger) + if aws_access_key_id is None or aws_secret_access_key is None: + return None, None + env_vars = { **os.environ, "AWS_ACCESS_KEY_ID": aws_access_key_id, diff --git a/components/job-orchestration/job_orchestration/executor/utils.py b/components/job-orchestration/job_orchestration/executor/utils.py index 47ea702ae..2b539b74e 100644 --- a/components/job-orchestration/job_orchestration/executor/utils.py +++ b/components/job-orchestration/job_orchestration/executor/utils.py @@ -1,9 +1,10 @@ from logging import Logger from pathlib import Path -from typing import Optional +from typing import Optional, Tuple from clp_py_utils.clp_config import WorkerConfig from clp_py_utils.core import read_yaml_config_file +from clp_py_utils.s3_utils import s3_get_frozen_credentials def load_worker_config( @@ -21,3 +22,14 @@ def load_worker_config( except Exception: logger.exception("Failed to load worker config") return None + + +def load_session_credentials(logger: Logger) -> Tuple[Optional[str], Optional[str]]: + s3_frozen_credentials = s3_get_frozen_credentials() + if s3_frozen_credentials is None: + logger.error("Failed to get s3 credentials from local session") + return None, None + if s3_frozen_credentials.token is not None: + logger.error("Not supporting session token at the moment") + return None, None + return s3_frozen_credentials.access_key, s3_frozen_credentials.secret_key