Skip to content

Commit

Permalink
chore: merge canals into Haystack codebase (#6422)
Browse files Browse the repository at this point in the history
* Ignore some mypy errors

* Fix I/O comparator

* Avoid calling asdict multiple times when comparing dataclasses

* Enhance component tests

* Fix I/O dataclasses comparison

* Use Any instead of type when expecting I/O dataclasses

* Fix mypy

* Change InputSocket taken_by field to sender

* Remove variadics implementation

* Adapt tests

* Enhance docs and simplify run

* Remove useless check on drawing

* Add __canals_optional_inputs__ field in components

* Rework a bit Pipeline._ready_to_run()

* Simplify some logic

* Add __canals_mandatory_inputs__ field in components

* Handle pipeline loops

* Fix tests

* Document component state run logic

* Add double loop pipeline test

* Make component decorator a class

* PR feedback

* Add error logging when registering Component with identical names

* Add 'remove' action that removes current component from Pipeline run input queue

* Simplify run checks and logging

* Better logging

* Apply suggestions from code review

Co-authored-by: ZanSara <[email protected]>

* Trim whitespace

* Add support for Union in Component's I/O

* Remove dependencies section in marshaled pipelines

* Create Component Protocol

* simpler optional deps

* Simplify component init wrapping and fix issue with save_init_params

* Update canals/pipeline/save_load.py

Co-authored-by: ZanSara <[email protected]>

* Simplify functions to find I/O sockets

* Fix import

* change import

* testing ci

* testing ci

* Simplify _save_init_params

* testing ci

* testing ci

* use direct pytest call

* trying to force old version for macos

* list macos versions

* list macos versions

* disable on macos

* remove extra

* refactor imports

* re-enable some logs

* some more tests

* small correction

* Remove unused leftover methods

* docs

* update docstring

* mention optionals

* example for dataclass initialization

* missed part

* fix api docs

* improve error reporting and testing

* add tests for Any

* parametrized tests

* fix test for py<3.10

* test type printing

* remove typing. prefix from Any (compat with Py3.11)

* test helpers

* test names

* add type_is_compatible()

* tests pass

* more tests

* add small comment

* handle Unions as anything else

* use sender/receiver for socket pairs

* more sender/receiver renames

* even more renames

* split if statement

* Update __about__.py

* fix logic operator and add tests

* Update __about__.py

* Simplify imports

* Move draw in pipeline module and clearly define public interface

* Format pyproject.toml

* Include only required files in built wheel

* Move sample components out of tests

* stub component class decorator

* update static sample components to new API

* stub

* dynamic output examples

* sum

* add components fixed

* re-add inputsocket and outputsocket creation

* fix component tests

* fixing tests

* Add methods to set I/O dinamically

* fix drawing

* fix some integration tests

* tests green

* pylint

* remove stray files

* Remove default in InputSocket and add is_optional field

* Fix drawing

* Rework sockets string representation

* Add back Component Protocol

* Simplify method to get string representation of types

* Remove sockets __str__

* Remove Component's I/O type checks at run time

* Remove IO check in init wrapper

* Update canals/utils.py

Co-authored-by: Massimiliano Pippi <[email protected]>

* Split __canals_io__ field in __canals_input__ and __canals_output__

* Order input and output fields

* Add test to verify __canals_component__ is set

* Remove empty line

* Add component class factory

* Fix API docs workflow failure

* fix api docs

* Update __about__.py

* Add component from_dict and to_dict methods

* Add Pipeline to_dict and from_dict

* Fix components tests

* Add some more tests

* Change error messages

* Simplify test_to_dict

* Add max_loops_allowed in test_to_dict

* Test non default max_loops_allowed in test_to_dict

* Rework marshal_pipelines

* Rework unmarshal_pipelines

* Rename some stuff

* allow falsy outputs

* apply falsy fix to validation

* add test for falsy inputs

* Split _cleanup_marshalled_data into two functions

* Use from_dict to deserialise component

* Remove commented out code and update variable name

* Add test to verify difference when unmarshaling Pipeline with duplicate names

* Update marshal_pipelines docstring

* update workflow

* exclude tests from mypy in pre-commit hooks

* add additional falsy tests

* remove unnecessary import

* split test into two

Co-authored-by: ZanSara <[email protected]>

* remove init_parameters decorator and fix assumptions

* fix accumulate

* stray if

* Bump version to 0.5.0

* Implement generic default_to_dict and default_from_dict

* Update default_to_dict docstring

Co-authored-by: Massimiliano Pippi <[email protected]>

* Remove all mentions of Component.defaults

* Add Remainder to_dict and from_dict (#91)

* Add Repeat to_dict and from_dict (#92)

* Add Sum to_dict and from_dict (#93)

* Add Greet to_dict and from_dict (#89)

Co-authored-by: Massimiliano Pippi <[email protected]>

* Rework Accumulate to_dict and from_dict (#86)

Co-authored-by: Massimiliano Pippi <[email protected]>

* Add to_dict and from_dict for Parity, Subtract, Double, Concatenate (#87)

* Add Concatenate to_dict and from_dict

* Add Double to_dict and from_dict

* Add Subtract to_dict and from_dict

* Add Parity to_dict and from_dict

---------

Co-authored-by: Massimiliano Pippi <[email protected]>

* Change _to_mermaid_text to use component serialization data (#94)

* Add MergeLoop to_dict and from_dict (#90)

Co-authored-by: Massimiliano Pippi <[email protected]>

* Add Threshold to_dict and from_dict (#97)

* Add AddFixedValue to_dict and from_dict (#88)

Co-authored-by: Massimiliano Pippi <[email protected]>

* Remove BaseTestComponent (#99)

* Change @component decorator so it doesn't add default to_dict and from_dict (#98)

* Rename some classes in tests to suppress Pytest warnings (#101)

* Check Component I/O socket names are valid (#100)

* Remove handling of shared component instances on Pipeline serialization (#102)

* Fix docs

* Bump version to 0.6.0

* Revert "Check Component I/O socket names are valid (#100)" (#103)

This reverts commit 4529874.

* Bump canals to 0.7.0

* Downgrade log from ERROR to DEBUG (#104)

* Make to/from_dict optional (#107)

* remove from/to dict from Protocol

* use a default marshaller

* example component with no serializers

* fix linting

* make it smarter

* fix linting

* thank you mypy protector of the dumb programmers

* feat: check returned dictionary (#106)

* better error message if components don't return dictionaries

* add test

* use factory

* needless import

* Update __about__.py

* fix default serialization and adjust sample components accordingly (#109)

* fix default serialization and adjust sample components accordingly

* typo

* fix pylint errors

* fix: `draw` function vs init parameters (#115)

* fix draw

* stray print

* Update version (#118)

* remove extras

* Revert "remove extras"

This reverts commit a096ff8.

* fix package name, change _parse_connection_name function name, add tests (#126)

* move sockets into components package (#127)

* chore: remove extras (#125)

* remove extras

* workflow

* typo

* fix: Sockets named "text/plain" or containing a "/" fail during pipeline.to_dict (#131)

* don't split sockets by /

* revert hashing edge keys

* docs: remove missing module from docs (#132)

* remove stray print (#123)

* addo sockets docs (#133)

* tidy up utils about types (#129)

* Update canals.md (#134)

* rename module in API docs

* make `__canals_output__` and `__canals_input__` management consistent  (#128)

* make __canals_output__ and __canals_input__ management consistent and assign them to the component instance

* make pylint happy

* return the original type instead of the metaclass

* use type checking instead of instance field

* declare the actual returned type

* fix after conflict resolution

* remove check

* Do not use a dict as intermediate format and use `Socket`s directly (#135)

* do not use a dict as intermediate format and use sockets directly to simplify code and remove side effects

* fix leftover from cherry-pick

* move is_optional evaluation for InputSocket to post_init (#136)

* re-introduce variadics to support Joiner node (#122)

* move sockets into components package

make __canals_output__ and __canals_input__ management consistent and assign them to the component instance

do not use a dict as intermediate format and use sockets directly to simplify code and remove side effects

move is_optional evaluation for InputSocket to post_init

re-introduce variadics to support Joiner node

restore connection-time check

use custom type annotation, fix tests

* fix leftovers from rebase

* rename fan-in to joiner

* clean up and fix typing

* let inputs arrive later

* address review comments

* address review comments

* fix docstrings

* try

* try

* fix run input

* linting

* remove comments

* fix pylint

* bumb version to 0.9.0 (#140)

* properly annotate classmethods (#139)

* feat: add `Pipeline.inputs()` (#120)

* add Pipeline.describe_input()

* add tests

* split dict and str outputs and add to error messages

* tests

* accepts/expects

* move methods

* fix tests

* fix module name

* tests

* review feedback

* Add missing typing_extensions dependency (#152)

* feat: use full connection data to route I/O (#148)

* fix sample components

* make sum variadic

* separate queue and buffer

* all works but loops & variadics together

* fix some tests

* fix some tests

* all tests green

* clean up code a bit

* refactor code

* fix tests

* fix self loops

* fix reused sockets bug

* add distinct loops

* add distinct loops test

* break out some code from run()

* docstring

* improve variadics drawing

* black

* document the deepcopy

* re-arrange connection dataclass and add tests

* consumer -> receiver

* fix typing

* move Connection-related code under component package

* clean up connect()

* cosmetics and typing

* fix linter, make Connection a dataclass again

* fix typing

* add test case for #105

---------

Co-authored-by: Massimiliano Pippi <[email protected]>

* feat: Add Component inputs/outputs functions (#158)

* Add component inputs/outputs methods

* Different impl approach

* Black fixes

* Rename functions to match naming in pipeline inputs/ouputs

* Fix find_component_inputs, update unit tests (#162)

* Fix API docs (#164)

* make Variadic wrap an iterable (#163)

* Add pipeline outputs method (#150)

Co-authored-by: ZanSara <[email protected]>

* Update __about__.py (#165)

Update version to 0.10.0

* add CODEOWNERS

* feat: read defaults from `run()` signature (#166)

* Read defaults from run signature

* simplify setting of sockets

* fix test

* Update sample_components/fstring.py

Co-authored-by: Massimiliano Pippi <[email protected]>

* Update canals/component/component.py

Co-authored-by: Massimiliano Pippi <[email protected]>

* dostring

---------

Co-authored-by: Massimiliano Pippi <[email protected]>

* Use full import path as 'type' in serialization.  (#167)

* Use full import path as 'type' in serialization. Try to import the path when deserializing

* fix test data

* add from_dict test

* remove leftover

* Update canals/pipeline/pipeline.py

Co-authored-by: ZanSara <[email protected]>

* add error message to PipelineError

---------

Co-authored-by: ZanSara <[email protected]>

* bump version

* fix: copy input values before passing them down pipeline.run (#168)

* copy input values before passing them down pipeline.run

* Update test_mutable_inputs.py

* fix mypy and pyright (#169)

* bump version

* remove data we won't keep

* reformat

* try

* skip tests on transient code

---------

Co-authored-by: Silvano Cerza <[email protected]>
Co-authored-by: Silvano Cerza <[email protected]>
Co-authored-by: ZanSara <[email protected]>
Co-authored-by: Michel Bartels <[email protected]>
Co-authored-by: ZanSara <[email protected]>
Co-authored-by: Julian Risch <[email protected]>
Co-authored-by: Julian Risch <[email protected]>
Co-authored-by: Vladimir Blagojevic <[email protected]>
  • Loading branch information
9 people authored Nov 27, 2023
1 parent db759b0 commit 011e32e
Show file tree
Hide file tree
Showing 89 changed files with 5,672 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/linting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:
- "test/**/*.py"
- "e2e/**/*.py"
- "pyproject.toml"
- "!haystack/core/**/*.py"

env:
PYTHON_VERSION: "3.8"
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:
- main
# release branches have the form v1.9.x
- "v[0-9].*[0-9].x"
- "!haystack/core/**/*.py"
pull_request:
types:
- opened
Expand All @@ -17,6 +18,7 @@ on:
paths:
- "haystack/**/*.py"
- "test/**/*.py"
- "!haystack/core/**/*.py"

env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
Expand Down
4 changes: 4 additions & 0 deletions haystack/core/canals/__about__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0
__version__ = "0.11.0"
9 changes: 9 additions & 0 deletions haystack/core/canals/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0
from canals.__about__ import __version__

from canals.component import component, Component
from canals.pipeline.pipeline import Pipeline

__all__ = ["component", "Component", "Pipeline"]
7 changes: 7 additions & 0 deletions haystack/core/canals/component/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0
from canals.component.component import component, Component
from canals.component.sockets import InputSocket, OutputSocket

__all__ = ["component", "Component", "InputSocket", "OutputSocket"]
307 changes: 307 additions & 0 deletions haystack/core/canals/component/component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
# SPDX-FileCopyrightText: 2022-present deepset GmbH <[email protected]>
#
# SPDX-License-Identifier: Apache-2.0
"""
Attributes:
component: Marks a class as a component. Any class decorated with `@component` can be used by a Pipeline.
All components must follow the contract below. This docstring is the source of truth for components contract.
<hr>
`@component` decorator
All component classes must be decorated with the `@component` decorator. This allows Canals to discover them.
<hr>
`__init__(self, **kwargs)`
Optional method.
Components may have an `__init__` method where they define:
- `self.init_parameters = {same parameters that the __init__ method received}`:
In this dictionary you can store any state the components wish to be persisted when they are saved.
These values will be given to the `__init__` method of a new instance when the pipeline is loaded.
Note that by default the `@component` decorator saves the arguments automatically.
However, if a component sets their own `init_parameters` manually in `__init__()`, that will be used instead.
Note: all of the values contained here **must be JSON serializable**. Serialize them manually if needed.
Components should take only "basic" Python types as parameters of their `__init__` function, or iterables and
dictionaries containing only such values. Anything else (objects, functions, etc) will raise an exception at init
time. If there's the need for such values, consider serializing them to a string.
_(TODO explain how to use classes and functions in init. In the meantime see `test/components/test_accumulate.py`)_
The `__init__` must be extrememly lightweight, because it's a frequent operation during the construction and
validation of the pipeline. If a component has some heavy state to initialize (models, backends, etc...) refer to
the `warm_up()` method.
<hr>
`warm_up(self)`
Optional method.
This method is called by Pipeline before the graph execution. Make sure to avoid double-initializations,
because Pipeline will not keep track of which components it called `warm_up()` on.
<hr>
`run(self, data)`
Mandatory method.
This is the method where the main functionality of the component should be carried out. It's called by
`Pipeline.run()`.
When the component should run, Pipeline will call this method with an instance of the dataclass returned by the
method decorated with `@component.input`. This dataclass contains:
- all the input values coming from other components connected to it,
- if any is missing, the corresponding value defined in `self.defaults`, if it exists.
`run()` must return a single instance of the dataclass declared through the method decorated with
`@component.output`.
"""

import logging
import inspect
from typing import Protocol, runtime_checkable, Any
from types import new_class
from copy import deepcopy

from canals.component.sockets import InputSocket, OutputSocket
from canals.errors import ComponentError

logger = logging.getLogger(__name__)


@runtime_checkable
class Component(Protocol):
"""
Note this is only used by type checking tools.
In order to implement the `Component` protocol, custom components need to
have a `run` method. The signature of the method and its return value
won't be checked, i.e. classes with the following methods:
def run(self, param: str) -> Dict[str, Any]:
...
and
def run(self, **kwargs):
...
will be both considered as respecting the protocol. This makes the type
checking much weaker, but we have other places where we ensure code is
dealing with actual Components.
The protocol is runtime checkable so it'll be possible to assert:
isinstance(MyComponent, Component)
"""

def run(self, *args: Any, **kwargs: Any): # pylint: disable=missing-function-docstring
...


class ComponentMeta(type):
def __call__(cls, *args, **kwargs):
"""
This method is called when clients instantiate a Component and
runs before __new__ and __init__.
"""
# This will call __new__ then __init__, giving us back the Component instance
instance = super().__call__(*args, **kwargs)

# Before returning, we have the chance to modify the newly created
# Component instance, so we take the chance and set up the I/O sockets

# If `component.set_output_types()` was called in the component constructor,
# `__canals_output__` is already populated, no need to do anything.
if not hasattr(instance, "__canals_output__"):
# If that's not the case, we need to populate `__canals_output__`
#
# If the `run` method was decorated, it has a `_output_types_cache` field assigned
# that stores the output specification.
# We deepcopy the content of the cache to transfer ownership from the class method
# to the actual instance, so that different instances of the same class won't share this data.
instance.__canals_output__ = deepcopy(getattr(instance.run, "_output_types_cache", {}))

# Create the sockets if set_input_types() wasn't called in the constructor.
# If it was called and there are some parameters also in the `run()` method, these take precedence.
if not hasattr(instance, "__canals_input__"):
instance.__canals_input__ = {}
run_signature = inspect.signature(getattr(cls, "run"))
for param in list(run_signature.parameters)[1:]: # First is 'self' and it doesn't matter.
if run_signature.parameters[param].kind == inspect.Parameter.POSITIONAL_OR_KEYWORD: # ignore `**kwargs`
instance.__canals_input__[param] = InputSocket(
name=param,
type=run_signature.parameters[param].annotation,
is_mandatory=run_signature.parameters[param].default == inspect.Parameter.empty,
)
return instance


class _Component:
"""
See module's docstring.
Args:
class_: the class that Canals should use as a component.
serializable: whether to check, at init time, if the component can be saved with
`save_pipelines()`.
Returns:
A class that can be recognized as a component.
Raises:
ComponentError: if the class provided has no `run()` method or otherwise doesn't respect the component contract.
"""

def __init__(self):
self.registry = {}

def set_input_types(self, instance, **types):
"""
Method that specifies the input types when 'kwargs' is passed to the run method.
Use as:
```python
@component
class MyComponent:
def __init__(self, value: int):
component.set_input_types(value_1=str, value_2=str)
...
@component.output_types(output_1=int, output_2=str)
def run(self, **kwargs):
return {"output_1": kwargs["value_1"], "output_2": ""}
```
Note that if the `run()` method also specifies some parameters, those will take precedence.
For example:
```python
@component
class MyComponent:
def __init__(self, value: int):
component.set_input_types(value_1=str, value_2=str)
...
@component.output_types(output_1=int, output_2=str)
def run(self, value_0: str, value_1: Optional[str] = None, **kwargs):
return {"output_1": kwargs["value_1"], "output_2": ""}
```
would add a mandatory `value_0` parameters, make the `value_1`
parameter optional with a default None, and keep the `value_2`
parameter mandatory as specified in `set_input_types`.
"""
instance.__canals_input__ = {name: InputSocket(name=name, type=type_) for name, type_ in types.items()}

def set_output_types(self, instance, **types):
"""
Method that specifies the output types when the 'run' method is not decorated
with 'component.output_types'.
Use as:
```python
@component
class MyComponent:
def __init__(self, value: int):
component.set_output_types(output_1=int, output_2=str)
...
# no decorators here
def run(self, value: int):
return {"output_1": 1, "output_2": "2"}
```
"""
instance.__canals_output__ = {name: OutputSocket(name=name, type=type_) for name, type_ in types.items()}

def output_types(self, **types):
"""
Decorator factory that specifies the output types of a component.
Use as:
```python
@component
class MyComponent:
@component.output_types(output_1=int, output_2=str)
def run(self, value: int):
return {"output_1": 1, "output_2": "2"}
```
"""

def output_types_decorator(run_method):
"""
This happens at class creation time, and since we don't have the decorated
class available here, we temporarily store the output types as an attribute of
the decorated method. The ComponentMeta metaclass will use this data to create
sockets at instance creation time.
"""
setattr(
run_method,
"_output_types_cache",
{name: OutputSocket(name=name, type=type_) for name, type_ in types.items()},
)
return run_method

return output_types_decorator

def _component(self, class_):
"""
Decorator validating the structure of the component and registering it in the components registry.
"""
logger.debug("Registering %s as a component", class_)

# Check for required methods and fail as soon as possible
if not hasattr(class_, "run"):
raise ComponentError(f"{class_.__name__} must have a 'run()' method. See the docs for more information.")

def copy_class_namespace(namespace):
"""
This is the callback that `typing.new_class` will use
to populate the newly created class. We just copy
the whole namespace from the decorated class.
"""
for key, val in dict(class_.__dict__).items():
namespace[key] = val

# Recreate the decorated component class so it uses our metaclass
class_ = new_class(class_.__name__, class_.__bases__, {"metaclass": ComponentMeta}, copy_class_namespace)

# Save the component in the class registry (for deserialization)
class_path = f"{class_.__module__}.{class_.__name__}"
if class_path in self.registry:
# Corner case, but it may occur easily in notebooks when re-running cells.
logger.debug(
"Component %s is already registered. Previous imported from '%s', new imported from '%s'",
class_path,
self.registry[class_path],
class_,
)
self.registry[class_path] = class_
logger.debug("Registered Component %s", class_)

return class_

def __call__(self, class_):
return self._component(class_)


component = _Component()
Loading

0 comments on commit 011e32e

Please sign in to comment.