Skip to content

Commit

Permalink
print native input/output when cache hit (#2567)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored Jul 8, 2024
1 parent 5ff9eb3 commit caf3139
Showing 1 changed file with 7 additions and 8 deletions.
15 changes: 7 additions & 8 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def local_execute(
# native constants are just bound to this specific task (default values for a task input)
# Also along with promises and constants, there could be dictionary or list of promises or constants
try:
kwargs = translate_inputs_to_literals(
literals = translate_inputs_to_literals(
ctx,
incoming_values=kwargs,
flyte_interface_types=self.interface.inputs,
Expand All @@ -293,20 +293,19 @@ def local_execute(
msg = f"Failed to convert inputs of task '{self.name}':\n {exc}"
logger.error(msg)
raise TypeError(msg) from exc
input_literal_map = _literal_models.LiteralMap(literals=kwargs)
input_literal_map = _literal_models.LiteralMap(literals=literals)

# if metadata.cache is set, check memoized version
local_config = LocalConfig.auto()
if self.metadata.cache and local_config.cache_enabled:
# TODO: how to get a nice `native_inputs` here?
logger.info(
f"Checking cache for task named {self.name}, cache version {self.metadata.cache_version} "
f", inputs: {input_literal_map}, and ignore input vars: {self.metadata.cache_ignore_input_vars}"
)
if local_config.cache_overwrite:
outputs_literal_map = None
logger.info("Cache overwrite, task will be executed now")
else:
logger.info(
f"Checking cache for task named {self.name}, cache version {self.metadata.cache_version} "
f", inputs: {kwargs}, and ignore input vars: {self.metadata.cache_ignore_input_vars}"
)
outputs_literal_map = LocalTaskCache.get(
self.name, self.metadata.cache_version, input_literal_map, self.metadata.cache_ignore_input_vars
)
Expand All @@ -327,7 +326,7 @@ def local_execute(
)
logger.info(
f"Cache set for task named {self.name}, cache version {self.metadata.cache_version} "
f", inputs: {input_literal_map}, and ignore input vars: {self.metadata.cache_ignore_input_vars}"
f", inputs: {kwargs}, and ignore input vars: {self.metadata.cache_ignore_input_vars}"
)
else:
# This code should mirror the call to `sandbox_execute` in the above cache case.
Expand Down

0 comments on commit caf3139

Please sign in to comment.