-
Notifications
You must be signed in to change notification settings - Fork 301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Offload literals #2872
Offload literals #2872
Changes from 11 commits
12f9edf
5fefa85
1668fda
853df62
5e53a1b
177368d
5fc2e84
db48d18
6884ee0
5a6423c
dbfea93
adeed34
25908d9
e827693
e0e2016
b284492
b579b83
8c2336e
c28d537
37b2bb4
e25496d
9276e96
a8bdbca
fe822b9
a4fcfab
b3a1b0d
32c5896
12e194a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,7 +12,7 @@ | |
import traceback | ||
import warnings | ||
from sys import exit | ||
from typing import Callable, List, Optional | ||
from typing import Callable, Dict, List, Optional | ||
|
||
import click | ||
from flyteidl.core import literals_pb2 as _literals_pb2 | ||
|
@@ -137,7 +137,41 @@ def _dispatch_execute( | |
logger.warning("Task produces no outputs") | ||
output_file_dict = {_constants.OUTPUT_FILE_NAME: _literal_models.LiteralMap(literals={})} | ||
elif isinstance(outputs, _literal_models.LiteralMap): | ||
output_file_dict = {_constants.OUTPUT_FILE_NAME: outputs} | ||
offloaded_literals: Dict[str, _literal_models.Literal] = {} | ||
literal_map_copy = {} | ||
|
||
min_offloaded_size = int(os.environ.get("FK_L_MIN_SIZE_MB", "10")) * 1024 * 1024 | ||
max_offloaded_size = int(os.environ.get("FK_L_MAX_SIZE_MB", "1000")) * 1024 * 1024 | ||
|
||
# Go over each output and create a separate offloaded in case its size is too large | ||
for k, v in outputs.literals.items(): | ||
lit = v.to_flyte_idl() | ||
if lit.ByteSize() >= max_offloaded_size: | ||
raise ValueError( | ||
f"Literal {k} is too large to be offloaded. Max literal size is {max_offloaded_size} whereas the literal size is {lit.ByteSize()} bytes" | ||
) | ||
|
||
if lit.ByteSize() >= min_offloaded_size: | ||
logger.debug(f"Literal {k} is too large to be inlined, offloading to metadata bucket") | ||
|
||
# TODO: hash calculation | ||
eapolinario marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how will this hash interoperate with hashmethod as specified by the user? how does caching work again? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the hash calculation in entrypoint only happens after dispatch_execute (in other words, if a user defined a |
||
|
||
offloaded_filename = f"{k}_offloaded_metadata.pb" | ||
|
||
offloaded_literal = _literal_models.Literal( | ||
offloaded_metadata=_literal_models.LiteralOffloadedMetadata( | ||
uri=f"{ctx.user_space_params.output_metadata_prefix}/{offloaded_filename}", | ||
size_bytes=lit.ByteSize(), | ||
# TODO: do I have to set the inferred literal type? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pmahindrakar-oss , in what conditions we have to set this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is needed when we have launcplan execution created by propeller
Code link in my comments below https://github.com/flyteorg/flytekit/pull/2872/files#r1819666906 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I commented in about separating those discussions and how we use the task typed interface to set There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do need to add inferred type, since during consumption we validate the input type There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're trying to get rid of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wont we still need this in the new isInstance form https://github.com/flyteorg/flyte/pull/5909/files#diff-af9c7a5cd03e0e860c155861edcf0b96863e8781526f51d1b13c7f9278ffa72fR415 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we get rid of the literaltypeforliteral function, and we don't need inferredtype, we should deprecate the field in the idl. don't want to keep it around and have future us wonder if it's necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's separate those two discussions. We might be able to remove Also, if/when we end up removing |
||
) | ||
) | ||
literal_map_copy[k] = offloaded_literal | ||
offloaded_literals[offloaded_filename] = v | ||
else: | ||
literal_map_copy[k] = v | ||
outputs = _literal_models.LiteralMap(literals=literal_map_copy) | ||
|
||
output_file_dict = {_constants.OUTPUT_FILE_NAME: outputs, **offloaded_literals} | ||
elif isinstance(outputs, _dynamic_job.DynamicJobSpec): | ||
output_file_dict = {_constants.FUTURES_FILE_NAME: outputs} | ||
else: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,6 +61,9 @@ def literal_string_repr(lit: Literal) -> typing.Any: | |
return [literal_string_repr(i) for i in lit.collection.literals] | ||
if lit.map: | ||
return {k: literal_string_repr(v) for k, v in lit.map.literals.items()} | ||
if lit.offloaded_metadata: | ||
# TODO: load literal from offloaded literal? | ||
return f"Offloaded literal metadata: {lit.offloaded_metadata}" | ||
Comment on lines
+64
to
+66
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there an example we can try to see if we need to load the literal here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is relevant for the
|
||
raise ValueError(f"Unknown literal type {lit}") | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we use the same names here that we use in propeller
https://github.com/unionai/flyte/blob/c10b55373e9dea2bcce493f978499648c3f89d8f/flytepropeller/pkg/controller/config/config.go#L204-L207
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we'll need to update the injection of environment variables to match those names.
The need for short env vars is described flyteorg/flyte#5665.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok with keeping the current ones .