Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Test LangChain And Non-Any PR #2436

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,7 @@ def to_python_value(cls, ctx: FlyteContext, lv: Literal, expected_python_type: T
"""
Converts a Literal value with an expected python type into a python value.
"""
# print("Expected Python Type: ", expected_python_type)
transformer = cls.get_transformer(expected_python_type)
return transformer.to_python_value(ctx, lv, expected_python_type)

Expand Down Expand Up @@ -1222,9 +1223,13 @@ def literal_map_to_kwargs(
kwargs = {}
for i, k in enumerate(lm.literals):
try:
# print("converting input: ", k, " with value: ", lm.literals[k])
# print("Type 1: ", python_interface_inputs[k])
kwargs[k] = TypeEngine.to_python_value(ctx, lm.literals[k], python_interface_inputs[k])
# print("kwargs[k]:", kwargs[k])
except TypeTransformerFailedError as exc:
raise TypeTransformerFailedError(f"Error converting input '{k}' at position {i}:\n {exc}") from exc

return kwargs

@classmethod
Expand Down
39 changes: 39 additions & 0 deletions flytekit/types/pickle/pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,46 @@ def assert_type(self, t: Type[T], v: T):
# Every type can serialize to pickle, so we don't need to check the type here.
...

# def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T:
# try:
# uri = lv.scalar.blob.uri
# return FlytePickle.from_pickle(uri)
# except Exception as e:
# from datetime import datetime, timedelta

# if lv.scalar:
# if lv.scalar.primitive:
# if lv.scalar.primitive.integer:
# return TypeEngine.to_python_value(ctx, lv, int)
# elif lv.scalar.primitive.float_value:
# return TypeEngine.to_python_value(ctx, lv, float)
# elif lv.scalar.primitive.string_value:
# return TypeEngine.to_python_value(ctx, lv, str)
# elif lv.scalar.primitive.boolean:
# return TypeEngine.to_python_value(ctx, lv, bool)
# elif lv.scalar.primitive.datetime:
# return TypeEngine.to_python_value(ctx, lv, datetime)
# elif lv.scalar.primitive.duration:
# return TypeEngine.to_python_value(ctx, lv, timedelta)
# raise None
def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T:
print("lv:", lv)
primitive = lv.scalar.primitive
if primitive:
from datetime import datetime, timedelta

type_mapping = {
"integer": int,
"float_value": float,
"string_value": str,
"boolean": bool,
"datetime": datetime,
"duration": timedelta,
}
for attr, py_type in type_mapping.items():
if getattr(primitive, attr) is not None:
return TypeEngine.to_python_value(ctx, lv, py_type)

uri = lv.scalar.blob.uri
return FlytePickle.from_pickle(uri)

Expand Down
33 changes: 33 additions & 0 deletions plugins/flytekit-langchain/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Flytekit Airflow Plugin
Airflow plugin allows you to seamlessly run Airflow tasks in the Flyte workflow without changing any code.

- Compile Airflow tasks to Flyte tasks
- Use Airflow sensors/operators in Flyte workflows
- Add support running Airflow tasks locally without running a cluster

## Example
```python
from airflow.sensors.filesystem import FileSensor
from flytekit import task, workflow

@task()
def t1():
print("flyte")


@workflow
def wf():
sensor = FileSensor(task_id="id", filepath="/tmp/1234")
sensor >> t1()


if __name__ == '__main__':
wf()
```


To install the plugin, run the following command:

```bash
pip install flytekitplugins-airflow
```
Empty file.
Loading
Loading