Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FlyteClient][FlyteDeck] Get Downloaded Artifact Signed URL via Data Proxy #2777

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from flyteidl.admin import task_pb2 as _task_pb2
from flyteidl.admin import workflow_attributes_pb2 as _workflow_attributes_pb2
from flyteidl.admin import workflow_pb2 as _workflow_pb2
from flyteidl.core import identifier_pb2 as _identifier_pb2
from flyteidl.service import dataproxy_pb2 as _data_proxy_pb2
from flyteidl.service.dataproxy_pb2 import ARTIFACT_TYPE_DECK
from google.protobuf.duration_pb2 import Duration

from flytekit.clients.raw import RawSynchronousFlyteClient as _RawSynchronousFlyteClient
Expand Down Expand Up @@ -1046,3 +1048,42 @@ def get_data(self, flyte_uri: str) -> _data_proxy_pb2.GetDataResponse:

resp = self._dataproxy_stub.GetData(req, metadata=self._metadata)
return resp

def get_download_artifact_signed_url(
self,
node_id: str,
project: str,
domain: str,
name: str,
artifact_type: _data_proxy_pb2.ArtifactType = ARTIFACT_TYPE_DECK,
expires_in: datetime.timedelta = None,
) -> _data_proxy_pb2.CreateDownloadLinkResponse:
"""
Get a signed url for an artifact.

:param node_id: Node id associated with artifact
:param project: Name of the project the resource belongs to
:param domain: Name of the domain the resource belongs to
:param name: User or system provided value for the resource
:param artifact_type: ArtifactType of the artifact requested
:param expires_in: If provided this defines a requested expiration duration for the generated url
:rtype: flyteidl.service.dataproxy_pb2.CreateDownloadLinkResponse
"""
expires_in_pb = None
if expires_in:
expires_in_pb = Duration()
expires_in_pb.FromTimedelta(expires_in)
return super(SynchronousFlyteClient, self).create_download_link(
_data_proxy_pb2.CreateDownloadLinkRequest(
artifact_type=artifact_type,
node_execution_id=_identifier_pb2.NodeExecutionIdentifier(
node_id=node_id,
execution_id=_identifier_pb2.WorkflowExecutionIdentifier(
project=project,
domain=domain,
name=name,
),
),
expires_in=expires_in_pb,
)
)
23 changes: 23 additions & 0 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
from flytekit.exceptions.user import FlyteAssertion, FlyteEntityNotExistException
from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task
from flytekit.remote.remote import FlyteRemote
from flyteidl.service import dataproxy_pb2 as _data_proxy_pb2
from flytekit.types.schema import FlyteSchema
from flytekit.clients.friendly import SynchronousFlyteClient as _SynchronousFlyteClient
from flytekit.configuration import PlatformConfig

MODULE_PATH = pathlib.Path(__file__).parent / "workflows/basic"
CONFIG = os.environ.get("FLYTECTL_CONFIG", str(pathlib.Path.home() / ".flyte" / "config-sandbox.yaml"))
Expand Down Expand Up @@ -99,6 +102,26 @@ def test_fetch_execute_launch_plan(register):
assert execution.outputs["o0"] == "hello world"


def test_get_download_artifact_signed_url(register):
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
flyte_launch_plan = remote.fetch_launch_plan(name="basic.basic_workflow.my_wf", version=VERSION)
execution = remote.execute(flyte_launch_plan, inputs={"a": 10, "b": "foobar"}, wait=True)
project, domain, name = execution.id.project, execution.id.domain, execution.id.name

# Fetch the download deck signed URL for the execution
client = _SynchronousFlyteClient(PlatformConfig.for_endpoint("localhost:30080", True))
download_link_response = client.get_download_artifact_signed_url(
node_id="n0", # Assuming node_id is "n0"
project=project,
domain=domain,
name=name,
artifact_type=_data_proxy_pb2.ARTIFACT_TYPE_DECK,
)

# Check if the signed URL is valid and starts with the expected prefix
signed_url = download_link_response.signed_url[0]
assert signed_url.startswith(f"http://localhost:30002/my-s3-bucket/metadata/propeller/{project}-{domain}-{name}/n0/data/0/deck.html")

def test_fetch_execute_launch_plan_with_args(register):
remote = FlyteRemote(Config.auto(config_file=CONFIG), PROJECT, DOMAIN)
flyte_launch_plan = remote.fetch_launch_plan(name="basic.basic_workflow.my_wf", version=VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from flytekit import task, workflow


@task
@task(enable_deck=True)
def t1(a: int) -> typing.NamedTuple("OutputsBC", t1_int_output=int, c=str):
return a + 2, "world"

Expand Down
Loading