Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add validate_flow operation to FlowsClient #979

Merged
merged 2 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog.d/20240422_093018_ada_sc_33101.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Added
~~~~~

- Added support to ``FlowsClient`` for the ``validate_flow`` operation of the
Globus Flows service. (:pr:`NUMBER`)
115 changes: 115 additions & 0 deletions src/globus_sdk/_testing/data/flows/validate_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from responses import matchers

from globus_sdk._testing.models import RegisteredResponse, ResponseSet

VALIDATE_SIMPLE_FLOW_DEFINITION = {
"Comment": "Simple flow",
"StartAt": "Step1",
"States": {
"Step1": {
"Type": "Action",
"ActionUrl": "https://transfer.actions.globus.org/transfer",
"Parameters": {
"source_endpoint.$": "$.source_endpoint_id",
"destination_endpoint.$": "$.destination_endpoint_id",
"DATA": [
{
"source_path.$": "$.source_path",
"destination_path.$": "$.destination_path",
}
],
},
"ResultPath": "$.TransferResult",
"End": True,
}
},
}

VALIDATE_SIMPLE_SUCCESS_RESPONSE = {
"scopes": {"User": ["urn:globus:auth:scope:transfer.api.globus.org:all"]}
}

VALIDATE_INVALID_FLOW_DEFINITION = {
"Comment": "Simple flow",
"StartAt": "Step1",
"States": {
"Step1": {
"Type": "Action",
"ActionUrl": "https://transfer.actions.globus.org/transfer",
"Parameters": {
"source_endpoint.$": "$.source_endpoint_id",
"destination_endpoint.$": "$.destination_endpoint_id",
"DATA": [
{
"source_path.$": "$.source_path",
"destination_path.$": "$.destination_path",
}
],
},
"ResultPath": "$.TransferResult",
}
},
}

VALIDATE_INVALID_RESPONSE = {
"error": {
"code": "UNPROCESSABLE_ENTITY",
"detail": [
{
"loc": ["definition", "States", "Step1"],
"msg": (
"A state of type 'Action' must be defined as either terminal "
'("End": true) or transitional ("Next": "NextStateId")'
),
"type": "value_error",
}
],
"message": (
"1 validation error in body. $.definition.States.Step1: A state of "
"type 'Action' must be defined as either terminal (\"End\": true) "
'or transitional ("Next": "NextStateId")'
),
},
"debug_id": "41267e70-6788-4316-8b67-df7160166466",
}

_validate_simple_flow_request = {
"definition": VALIDATE_SIMPLE_FLOW_DEFINITION,
}

_validate_invalid_flow_request = {
"definition": VALIDATE_INVALID_FLOW_DEFINITION,
}

RESPONSES = ResponseSet(
metadata={
"success": VALIDATE_SIMPLE_FLOW_DEFINITION,
"invalid": VALIDATE_INVALID_FLOW_DEFINITION,
},
default=RegisteredResponse(
service="flows",
path="/flows/validate",
method="POST",
status=200,
json=VALIDATE_SIMPLE_SUCCESS_RESPONSE,
match=[
matchers.json_params_matcher(
params={"definition": VALIDATE_SIMPLE_FLOW_DEFINITION},
strict_match=False,
)
],
),
definition_error=RegisteredResponse(
service="flows",
path="/flows/validate",
method="POST",
status=422,
json=VALIDATE_INVALID_RESPONSE,
match=[
matchers.json_params_matcher(
params={"definition": VALIDATE_INVALID_FLOW_DEFINITION},
strict_match=False,
)
],
),
)
82 changes: 67 additions & 15 deletions src/globus_sdk/services/flows/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ def create_flow(
additional_fields: dict[str, t.Any] | None = None,
) -> GlobusHTTPResponse:
"""
Create a Flow
Create a flow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😬

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a style-guide for this. We started using emphasis (boldface, IIRC?) in some of the docs for flow, run, timer, and other proper nouns which are easily confused with service names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think the way I'd phrase it is roughly:

Entities (e.g., flow, run, action, action provider) are lowercase and first appearance is set off as a "term" in whatever paradigm a given document uses; for the docs site, we use bold

Is there such a paradigm—or related prior-art—in the SDK docs? If so, happy to make consistent!


:param title: A non-unique, human-friendly name used for displaying the
flow to end users. (1 - 128 characters)
:param definition: JSON object specifying flows states and execution order. For
a more detailed explanation of the flow definition, see
`Authoring Flows <https://docs.globus.org/api/flows/authoring-flows>`_
:param input_schema: A JSON Schema to which Flow Invocation input must conform
:param input_schema: A JSON Schema to which flow run input must conform
:param subtitle: A concise summary of the flow’s purpose. (0 - 128 characters)
:param description: A detailed description of the flow's purpose for end user
display. (0 - 4096 characters)
Expand All @@ -75,7 +75,7 @@ def create_flow(


:param flow_starters: A set of Principal URN values, or the value
"all_authenticated_users", indicating entities who can initiate a *Run* of
"all_authenticated_users", indicating entities who can initiate a *run* of
the flow

.. dropdown:: Example Values
Expand Down Expand Up @@ -174,9 +174,9 @@ def get_flow(
*,
query_params: dict[str, t.Any] | None = None,
) -> GlobusHTTPResponse:
"""Retrieve a Flow by ID
"""Retrieve a flow by ID

:param flow_id: The ID of the Flow to fetch
:param flow_id: The ID of the flow to fetch
:param query_params: Any additional parameters to be passed through
as query params.

Expand Down Expand Up @@ -205,10 +205,10 @@ def list_flows(
query_params: dict[str, t.Any] | None = None,
) -> IterableFlowsResponse:
"""
List deployed Flows
List deployed flows

:param filter_role: A role name specifying the minimum permissions required for
a Flow to be included in the response.
a flow to be included in the response.
:param filter_fulltext: A string to use in a full-text search to filter results
:param orderby: A criterion for ordering flows in the listing
:param marker: A marker for pagination
Expand Down Expand Up @@ -329,18 +329,18 @@ def update_flow(
additional_fields: dict[str, t.Any] | None = None,
) -> GlobusHTTPResponse:
"""
Update a Flow
Update a flow

Only the parameter `flow_id` is required.
Any fields omitted from the request will be unchanged

:param flow_id: The ID of the Flow to fetch
:param flow_id: The ID of the flow to fetch
:param title: A non-unique, human-friendly name used for displaying the
flow to end users. (1 - 128 characters)
:param definition: JSON object specifying flows states and execution order. For
a more detailed explanation of the flow definition, see
`Authoring Flows <https://docs.globus.org/api/flows/authoring-flows>`_
:param input_schema: A JSON Schema to which Flow Invocation input must conform
:param input_schema: A JSON Schema to which flow run input must conform
:param subtitle: A concise summary of the flow’s purpose. (0 - 128 characters)
:param description: A detailed description of the flow's purpose for end user
display. (0 - 4096 characters)
Expand All @@ -363,7 +363,7 @@ def update_flow(
]

:param flow_starters: A set of Principal URN values, or the value
"all_authenticated_users", indicating entities who can initiate a *Run* of
"all_authenticated_users", indicating entities who can initiate a *run* of
the flow

.. dropdown:: Example Values
Expand Down Expand Up @@ -450,7 +450,7 @@ def delete_flow(
*,
query_params: dict[str, t.Any] | None = None,
) -> GlobusHTTPResponse:
"""Delete a Flow
"""Delete a flow

:param flow_id: The ID of the flow to delete
:param query_params: Any additional parameters to be passed through
Expand All @@ -470,6 +470,58 @@ def delete_flow(

return self.delete(f"/flows/{flow_id}", query_params=query_params)

def validate_flow(
self,
definition: dict[str, t.Any],
input_schema: dict[str, t.Any] | MissingType = MISSING,
) -> GlobusHTTPResponse:
"""
Validate a flow

:param definition: JSON object specifying flow states and execution order. For
a more detailed explanation of the flow definition, see
`Authoring Flows <https://docs.globus.org/api/flows/authoring-flows>`_
:param input_schema: A JSON Schema to which flow run input must conform

.. tab-set::

.. tab-item:: Example Usage

.. code-block:: python

from globus_sdk import FlowsClient

...
flows = FlowsClient(...)
flows.validate_flow(
definition={
"StartAt": "the-one-true-state",
"States": {"the-one-true-state": {"Type": "Pass", "End": True}},
},
input_schema={
"type": "object",
"properties": {
"input-a": {"type": "string"},
"input-b": {"type": "number"},
"input-c": {"type": "boolean"},
},
},
)

.. tab-item:: Example Response Data

.. expandtestfixture:: flows.validate_flow

.. tab-item:: API Info

.. extdoclink:: Validate Flow
:service: flows
:ref: Flows/paths/~1flows~1validate/post
""" # noqa E501

data = {"definition": definition, "input_schema": input_schema}
return self.post("/flows/validate", data=data)

@paging.has_paginator(paging.MarkerPaginator, items_key="runs")
def list_runs(
self,
Expand All @@ -481,7 +533,7 @@ def list_runs(
"""
List all runs.

:param filter_flow_id: One or more Flow IDs used to filter the results
:param filter_flow_id: One or more flow IDs used to filter the results
:param marker: A pagination marker, used to get the next page of results.
:param query_params: Any additional parameters to be passed through

Expand Down Expand Up @@ -575,7 +627,7 @@ def get_run(
query_params: dict[str, t.Any] | None = None,
) -> GlobusHTTPResponse:
"""
Retrieve information about a particular Run of a Flow
Retrieve information about a particular run of a flow

:param run_id: The ID of the run to get
:param include_flow_description: If set to true, the lookup will attempt to
Expand Down Expand Up @@ -775,7 +827,7 @@ def delete_run(self, run_id: UUIDLike) -> GlobusHTTPResponse:

class SpecificFlowClient(client.BaseClient):
r"""
Client for interacting with a specific Globus Flow through the Flows API.
Client for interacting with a specific flow through the Globus Flows API.

Unlike other client types, this must be provided with a specific flow id. All other
arguments are the same as those for :class:`~globus_sdk.BaseClient`.
Expand Down
46 changes: 46 additions & 0 deletions tests/functional/services/flows/test_flow_validate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import json

import pytest

from globus_sdk import FlowsAPIError
from globus_sdk._testing import get_last_request, load_response


@pytest.mark.parametrize("input_schema", [None, {}])
def test_validate_flow(flows_client, input_schema):
metadata = load_response(flows_client.validate_flow).metadata

# Prepare the payload
payload = {"definition": metadata["success"]}
if input_schema is not None:
payload["input_schema"] = input_schema

resp = flows_client.validate_flow(**payload)
assert resp.data["scopes"] == {
"User": ["urn:globus:auth:scope:transfer.api.globus.org:all"]
}

# Check what was actually sent
last_req = get_last_request()
req_body = json.loads(last_req.body)
# Ensure the input schema is not sent if omitted
assert req_body == payload


def test_validate_flow_error_parsing(flows_client):
metadata = load_response(
flows_client.validate_flow, case="definition_error"
).metadata

# Make sure we get an error response
with pytest.raises(FlowsAPIError) as excinfo:
flows_client.validate_flow(definition=metadata["invalid"])

err = excinfo.value
assert err.code == "UNPROCESSABLE_ENTITY"
assert err.messages == [
(
"A state of type 'Action' must be defined as either terminal "
'("End": true) or transitional ("Next": "NextStateId")'
),
]
Loading