Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FlyteClient][FlyteDeck] Get Downloaded Artifact Signed URL via Data Proxy #2777

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from flyteidl.admin import task_pb2 as _task_pb2
from flyteidl.admin import workflow_attributes_pb2 as _workflow_attributes_pb2
from flyteidl.admin import workflow_pb2 as _workflow_pb2
from flyteidl.core import identifier_pb2 as _identifier_pb2
from flyteidl.service import dataproxy_pb2 as _data_proxy_pb2
from flyteidl.service.dataproxy_pb2 import ARTIFACT_TYPE_DECK
from google.protobuf.duration_pb2 import Duration

from flytekit.clients.raw import RawSynchronousFlyteClient as _RawSynchronousFlyteClient
Expand Down Expand Up @@ -1046,3 +1048,33 @@ def get_data(self, flyte_uri: str) -> _data_proxy_pb2.GetDataResponse:

resp = self._dataproxy_stub.GetData(req, metadata=self._metadata)
return resp

def get_download_deck_signed_url(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we need to change the function name to get_download_signed_url and add ArtifactType to the input args?
cc @wild-endeavor @eapolinario

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. What if we set the default artifact_type to ARTIFACT_TYPE_DECK?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I can do that

self,
node_id: str,
project: str,
domain: str,
name: str,
expires_in: datetime.timedelta = None,
) -> _data_proxy_pb2.CreateDownloadLinkResponse:
"""
This is a new API for flyte and union cluster to get the signed url for the deck artifact.
"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self,
node_id: str,
project: str,
domain: str,
name: str,
expires_in: datetime.timedelta = None,
) -> _data_proxy_pb2.CreateDownloadLinkResponse:
"""
This is a new API for flyte and union cluster to get the signed url for the deck artifact.
"""
self,
node_id: str,
project: str,
domain: str,
name: str,
artifact_type: _data_proxy_pb2.ArtifactType = ARTIFACT_TYPE_DECK,
expires_in: datetime.timedelta = None,
) -> _data_proxy_pb2.CreateDownloadLinkResponse:
"""
Get a signed url for an artifact.
:param node_id: Node id associated with artifact
:param project: Name of the project the resource belongs to
:param domain: Name of the domain the resource belongs to
:param name: User or system provided value for the resource
:param artifact_type: ArtifactType of the artifact requested
:param expires_in: If provided this defines a requested expiration duration for the generated url
:rtype: flyteidl.service.dataproxy_pb2.CreateDownloadLinkResponse
"""

expires_in_pb = None
if expires_in:
expires_in_pb = Duration()
expires_in_pb.FromTimedelta(expires_in)
return super(SynchronousFlyteClient, self).create_download_link(
_data_proxy_pb2.CreateDownloadLinkRequest(
artifact_type=ARTIFACT_TYPE_DECK,
node_execution_id=_identifier_pb2.NodeExecutionIdentifier(
node_id=node_id,
execution_id=_identifier_pb2.WorkflowExecutionIdentifier(
project=project,
domain=domain,
name=name,
),
),
expires_in=expires_in_pb,
)
)
21 changes: 21 additions & 0 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task
from flytekit.remote.remote import FlyteRemote
from flytekit.types.schema import FlyteSchema
from flytekit.clients.friendly import SynchronousFlyteClient as _SynchronousFlyteClient
from flytekit.configuration import PlatformConfig

MODULE_PATH = pathlib.Path(__file__).parent / "workflows/basic"
CONFIG = os.environ.get("FLYTECTL_CONFIG", str(pathlib.Path.home() / ".flyte" / "config-sandbox.yaml"))
Expand Down Expand Up @@ -99,6 +101,25 @@ def test_fetch_execute_launch_plan(register):
assert execution.outputs["o0"] == "hello world"


def test_get_download_deck_signed_url(register):
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
flyte_launch_plan = remote.fetch_launch_plan(name="basic.basic_workflow.my_wf", version=VERSION)
execution = remote.execute(flyte_launch_plan, inputs={"a": 10, "b": "foobar"}, wait=True)
project, domain, name = execution.id.project, execution.id.domain, execution.id.name

# Fetch the download deck signed URL for the execution
client = _SynchronousFlyteClient(PlatformConfig.for_endpoint("localhost:30080", True))
download_link_response = client.get_download_deck_signed_url(
node_id="n0", # Assuming node_id is "n0"
project=project,
domain=domain,
name=name,
)

# Check if the signed URL is valid and starts with the expected prefix
signed_url = download_link_response.signed_url[0]
assert signed_url.startswith(f"http://localhost:30002/my-s3-bucket/metadata/propeller/{project}-{domain}-{name}/n0/data/0/deck.html")

def test_fetch_execute_launch_plan_with_args(register):
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
flyte_launch_plan = remote.fetch_launch_plan(name="basic.basic_workflow.my_wf", version=VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from flytekit import task, workflow


@task
@task(enable_deck=True)
def t1(a: int) -> typing.NamedTuple("OutputsBC", t1_int_output=int, c=str):
return a + 2, "world"

Expand Down
Loading