Skip to content

Commit

Permalink
Some more typing for test populators.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed Sep 19, 2022
1 parent eedc1db commit b8fe6a5
Showing 1 changed file with 25 additions and 19 deletions.
44 changes: 25 additions & 19 deletions lib/galaxy_test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -1393,6 +1393,10 @@ def _summarize_history(self, history_id):
self.galaxy_interactor._summarize_history(history_id)


# Things gxformat2 knows how to upload as workflows
HasYamlT = Union[str, os.PathLike, dict]


class BaseWorkflowPopulator(BasePopulator):
dataset_populator: BaseDatasetPopulator
dataset_collection_populator: "BaseDatasetCollectionPopulator"
Expand Down Expand Up @@ -1445,7 +1449,7 @@ def create_workflow_response(self, workflow: Dict[str, Any], **create_kwds) -> R
upload_response = self._post("workflows/upload", data=data)
return upload_response

def upload_yaml_workflow(self, has_yaml, **kwds) -> str:
def upload_yaml_workflow(self, has_yaml: HasYamlT, **kwds) -> str:
round_trip_conversion = kwds.get("round_trip_format_conversion", False)
client_convert = kwds.pop("client_convert", not round_trip_conversion)
kwds["convert"] = client_convert
Expand Down Expand Up @@ -1718,18 +1722,18 @@ def export_for_update(self, workflow_id):

def run_workflow(
self,
has_workflow,
test_data=None,
history_id=None,
wait=True,
source_type=None,
has_workflow: HasYamlT,
test_data: Optional[Union[str, dict]] = None,
history_id: Optional[str] = None,
wait: bool = True,
source_type: Optional[str] = None,
jobs_descriptions=None,
expected_response=200,
assert_ok=True,
client_convert=None,
round_trip_format_conversion=False,
invocations=1,
raw_yaml=False,
expected_response: int = 200,
assert_ok: bool = True,
client_convert: Optional[bool] = None,
round_trip_format_conversion: bool = False,
invocations: int = 1,
raw_yaml: bool = False,
):
"""High-level wrapper around workflow API, etc. to invoke format 2 workflows."""
workflow_populator = self
Expand All @@ -1747,19 +1751,21 @@ def run_workflow(
if test_data is None:
if jobs_descriptions is None:
assert source_type != "path"
assert isinstance(has_workflow, str)
jobs_descriptions = yaml.safe_load(has_workflow)

test_data = jobs_descriptions.get("test_data", {})

if not isinstance(test_data, dict):
test_data = yaml.safe_load(test_data)
test_data_dict = jobs_descriptions.get("test_data", {})
elif not isinstance(test_data, dict):
test_data_dict = yaml.safe_load(test_data)
else:
test_data_dict = test_data

parameters = test_data.pop("step_parameters", {})
replacement_parameters = test_data.pop("replacement_parameters", {})
parameters = test_data_dict.pop("step_parameters", {})
replacement_parameters = test_data_dict.pop("replacement_parameters", {})
if history_id is None:
history_id = self.dataset_populator.new_history()
inputs, label_map, has_uploads = load_data_dict(
history_id, test_data, self.dataset_populator, self.dataset_collection_populator
history_id, test_data_dict, self.dataset_populator, self.dataset_collection_populator
)
workflow_request: Dict[str, Any] = dict(
history=f"hist_id={history_id}",
Expand Down

0 comments on commit b8fe6a5

Please sign in to comment.