Skip to content

Commit

Permalink
Type annotations and refactorings of input staging
Browse files Browse the repository at this point in the history
  • Loading branch information
nsoranzo committed Nov 9, 2022
1 parent 339a7b1 commit 6b752ca
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 157 deletions.
4 changes: 2 additions & 2 deletions lib/galaxy/selenium/navigates_galaxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,12 @@ def history_contents(self, history_id=None, view="summary", datasets_only=True):
endpoint = f"histories/{history_id}?view={view}"
return self.api_get(endpoint)

def current_history(self):
def current_history(self) -> Dict[str, Any]:
full_url = self.build_url("history/current_history_json", for_selenium=False)
response = requests.get(full_url, cookies=self.selenium_to_requests_cookies(), timeout=DEFAULT_SOCKET_TIMEOUT)
return response.json()

def current_history_id(self):
def current_history_id(self) -> str:
return self.current_history()["id"]

def latest_history_item(self):
Expand Down
104 changes: 55 additions & 49 deletions lib/galaxy/tool_util/client/staging.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,31 @@
import json
import logging
import os
from typing import (
Any,
BinaryIO,
Dict,
List,
Optional,
Tuple,
TYPE_CHECKING,
)

import yaml
from typing_extensions import Literal

from galaxy.tool_util.cwl.util import (
DirectoryUploadTarget,
FileLiteralTarget,
FileUploadTarget,
galactic_job_json,
ObjectUploadTarget,
path_or_uri_to_uri,
UploadTarget,
)

if TYPE_CHECKING:
from galaxy_test.base.api import ApiTestInteractor

log = logging.getLogger(__name__)

UPLOAD_TOOL_ID = "upload1"
Expand All @@ -36,20 +50,20 @@ class StagingInterface(metaclass=abc.ABCMeta):
"""

@abc.abstractmethod
def _post(self, api_path, payload, files_attached=False):
def _post(self, api_path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Make a post to the Galaxy API along supplied path."""

def _attach_file(self, path):
def _attach_file(self, path: str) -> BinaryIO:
return open(path, "rb")

def _tools_post(self, payload, files_attached=False):
tool_response = self._post("tools", payload, files_attached=files_attached)
def _tools_post(self, payload: Dict[str, Any]) -> Dict[str, Any]:
tool_response = self._post("tools", payload)
for job in tool_response.get("jobs", []):
self._handle_job(job)
return tool_response

def _fetch_post(self, payload, files_attached=False):
tool_response = self._post("tools/fetch", payload, files_attached=files_attached)
def _fetch_post(self, payload: Dict[str, Any]) -> Dict[str, Any]:
tool_response = self._post("tools/fetch", payload)
for job in tool_response.get("jobs", []):
self._handle_job(job)
return tool_response
Expand All @@ -60,24 +74,20 @@ def _handle_job(self, job_response):

def stage(
self,
tool_or_workflow,
history_id,
job=None,
job_path=None,
use_path_paste=LOAD_TOOLS_FROM_PATH,
to_posix_lines=True,
job_dir=".",
):
files_attached = [False]

def upload_func_fetch(upload_target):
def _attach_file(upload_payload, uri, index=0):
tool_or_workflow: Literal["tool", "workflow"],
history_id: str,
job: Dict[str, Any],
use_path_paste: bool = LOAD_TOOLS_FROM_PATH,
to_posix_lines: bool = True,
job_dir: str = ".",
) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
def upload_func_fetch(upload_target: UploadTarget) -> Dict[str, Any]:
def _attach_file(upload_payload: Dict[str, Any], uri: str, index: int = 0) -> Dict[str, str]:
uri = path_or_uri_to_uri(uri)
is_path = uri.startswith("file://")
if not is_path or use_path_paste:
return {"src": "url", "url": uri}
else:
files_attached[0] = True
path = uri[len("file://") :]
upload_payload["__files"][f"files_{index}|file_data"] = self._attach_file(path)
return {"src": "files"}
Expand Down Expand Up @@ -131,7 +141,7 @@ def _attach_file(upload_payload, uri, index=0):
tar_path = upload_target.tar_path
src = _attach_file(fetch_payload, tar_path)
fetch_payload["targets"][0]["elements_from"] = src
else:
elif isinstance(upload_target, ObjectUploadTarget):
content = json.dumps(upload_target.object)
fetch_payload = _fetch_payload(history_id, file_type="expression.json")
fetch_payload["targets"][0]["elements"][0].update(
Expand All @@ -141,18 +151,20 @@ def _attach_file(upload_payload, uri, index=0):
}
)
tags = upload_target.properties.get("tags")
fetch_payload["targets"][0]["elements"][0]["tags"] = tags
return self._fetch_post(fetch_payload, files_attached=files_attached[0])
if tags:
fetch_payload["targets"][0]["elements"][0]["tags"] = tags
else:
raise ValueError(f"Unsupported type for upload_target: {type(upload_target)}")
return self._fetch_post(fetch_payload)

# Save legacy upload_func to target older Galaxy servers
def upload_func(upload_target):
def _attach_file(upload_payload, uri, index=0):
def upload_func(upload_target: UploadTarget) -> Dict[str, Any]:
def _attach_file(upload_payload: Dict[str, Any], uri: str, index: int = 0) -> None:
uri = path_or_uri_to_uri(uri)
is_path = uri.startswith("file://")
if not is_path or use_path_paste:
upload_payload["inputs"]["files_%d|url_paste" % index] = uri
else:
files_attached[0] = True
path = uri[len("file://") :]
upload_payload["__files"]["files_%d|file_data" % index] = self._attach_file(path)

Expand Down Expand Up @@ -187,7 +199,7 @@ def _attach_file(upload_payload, uri, index=0):
_attach_file(upload_payload, composite_data, index=i)

self._log(f"upload_payload is {upload_payload}")
return self._tools_post(upload_payload, files_attached=files_attached[0])
return self._tools_post(upload_payload)
elif isinstance(upload_target, FileLiteralTarget):
# For file literals - take them as is - never convert line endings.
payload = _upload_payload(history_id, file_type="auto", auto_decompress=False, to_posix_lines=False)
Expand All @@ -202,7 +214,7 @@ def _attach_file(upload_payload, uri, index=0):
)
upload_payload["inputs"]["files_0|auto_decompress"] = False
_attach_file(upload_payload, tar_path)
tar_upload_response = self._tools_post(upload_payload, files_attached=files_attached[0])
tar_upload_response = self._tools_post(upload_payload)
convert_payload = dict(
tool_id="CONVERTER_tar_to_directory",
tool_inputs={"input1": {"src": "hda", "id": tar_upload_response["outputs"][0]["id"]}},
Expand All @@ -211,13 +223,15 @@ def _attach_file(upload_payload, uri, index=0):
convert_response = self._tools_post(convert_payload)
assert "outputs" in convert_response, convert_response
return convert_response
else:
elif isinstance(upload_target, ObjectUploadTarget):
content = json.dumps(upload_target.object)
payload = _upload_payload(history_id, file_type="expression.json")
payload["files_0|url_paste"] = content
return self._tools_post(payload)
else:
raise ValueError(f"Unsupported type for upload_target: {type(upload_target)}")

def create_collection_func(element_identifiers, collection_type):
def create_collection_func(element_identifiers: List[Dict[str, Any]], collection_type: str) -> Dict[str, Any]:
payload = {
"name": "dataset collection",
"instance_type": "history",
Expand All @@ -228,28 +242,18 @@ def create_collection_func(element_identifiers, collection_type):
}
return self._post("dataset_collections", payload)

if job_path is not None:
assert job is None
with open(job_path) as f:
job = yaml.safe_load(f)
job_dir = os.path.dirname(os.path.abspath(job_path))
else:
assert job is not None
assert job_dir is not None

if self.use_fetch_api:
upload = upload_func_fetch
else:
upload = upload_func

job_dict, datasets = galactic_job_json(
return galactic_job_json(
job,
job_dir,
upload,
create_collection_func,
tool_or_workflow,
)
return job_dict, datasets

# extension point for planemo to override logging
def _log(self, message):
Expand All @@ -261,11 +265,11 @@ def use_fetch_api(self):


class InteractorStaging(StagingInterface):
def __init__(self, galaxy_interactor, use_fetch_api=DEFAULT_USE_FETCH_API):
def __init__(self, galaxy_interactor: "ApiTestInteractor", use_fetch_api: bool = DEFAULT_USE_FETCH_API) -> None:
self.galaxy_interactor = galaxy_interactor
self._use_fetch_api = use_fetch_api

def _post(self, api_path, payload, files_attached=False):
def _post(self, api_path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
response = self.galaxy_interactor._post(api_path, payload, json=True)
assert response.status_code == 200, response.text
return response.json()
Expand All @@ -278,20 +282,22 @@ def use_fetch_api(self):
return self._use_fetch_api


def _file_path_to_name(file_path):
def _file_path_to_name(file_path: Optional[str]) -> str:
if file_path is not None:
name = os.path.basename(file_path)
else:
name = "defaultname"
return name


def _upload_payload(history_id, tool_id=UPLOAD_TOOL_ID, file_type=DEFAULT_FILE_TYPE, dbkey=DEFAULT_DBKEY, **kwd):
"""Adapted from bioblend tools client."""
payload = {}
def _upload_payload(
history_id: str, file_type: str = DEFAULT_FILE_TYPE, dbkey: str = DEFAULT_DBKEY, **kwd
) -> Dict[str, Any]:
"""Adapted from BioBlend tools client."""
payload: Dict[str, Any] = {}
payload["history_id"] = history_id
payload["tool_id"] = tool_id
tool_input = {}
payload["tool_id"] = UPLOAD_TOOL_ID
tool_input: Dict[str, Any] = {}
tool_input["file_type"] = file_type
tool_input["dbkey"] = dbkey
if not kwd.get("to_posix_lines", True):
Expand Down
Loading

0 comments on commit 6b752ca

Please sign in to comment.