-
Notifications
You must be signed in to change notification settings - Fork 3
feat: use full connection data to route I/O #148
Conversation
None
to skip branches
Unbeknownst to me that this PR/issue existed (@ZanSara informed me), I encountered its manifestation while working on a demo for a new ConditionalRouter. Here is a small code sample summarizing the connection firing issue.
The current main version of canals returns an empty {} while the canals from this branch returns the correct answer, as asserted at the end of the code sample. 👍 Perhaps this small sample could be added as a unit test. I'll let you decide if that's necessary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's split this PR into mulitple smaller ones, we can follow the order of the list I see in the PR description
Just want to stress that this is semantically wrong. I kinda already went over this point on #142.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went through the diff an made some changes:
- Removed any reference of consumer/producer (we already have sender/receiver, I made them consistent)
- Moved
Connection
under thecomponent
package - to me the connection looks closer to sockets than pipelines - Moved some logic out of the
Pipeline
methods into theConnection
class - this makes the pipeline code more readable - Added a minimal, explicit test case for the optional input
- Added more unit tests to increase coverage
- Fixed typing to have the
pipeline
andconnection
modules clean - Added my notes here and there as code comments
- Changed variable names where I felt it could add clarity
Main changes
A small, internal (not visible as API)
Connection
object is introduced to simplify the implementation of therun()
method. Access to the same values through networkx's API is just too hard to read and error-prone. TheConnection
class simplifies access to tuples of (producer_component
,producer_socket
,consumer_component
,consumer_socket
), plus a couple of shorthands like a readable__repr__
and ais_mandatory
method for the entire connection, that returns True if a connection must be waited for before running a component.InputSocket.is_optional
, based on typing, becameInputSocket.is_mandatory
, based on the presence or absence of a default value=None
#111The full list of connections (
self._connections
) and a mapping of all the mandatory connections of each component (self._mandatory_connections
) are now Pipeline's state and are populated by theself._direct_connect
method instead of being re-computed at everyPipeline.run()
.The
inputs_buffer
object has been split into three separate data structures:components_queue: List[str]
: contains the list of the components to process, similar to the oldinputs_buffer.keys()
mandatory_values_buffer: Dict[Connection, Any]
: For each connection that exists in the pipeline where the receiving socket is mandatory, it stores the value that is being sent over that connection. UsesConnection
objects as keys to ease access.optional_values_buffer: Dict[Connection, Any]
: Semantically identical tomandatory_values_buffer
, but for connections that are not mandatory.The possible actions for a component have been reduced from four (
run
,wait
,skip
,remove
) to a boolean value whereTrue == "run"
andFalse == "wait"
Skipping branches is now done by not adding a given component to the
components_queue
instead of signaling the skip with a None. Skipped branches are detected as follows: if from no component in the components queue,networkx.has_path(component, waiting_component)
is True, then the connection will never receive a value and therefore it is not waited for.API Impact
No changes at Pipeline's API level.
Loops
Tests were added to make sure many more looping topologies are now supported. However, note how looping is easier to implement with variadic loop mergers. Fixed loop mergers do work in simple topologies but don't cover all cases, while variadics seem to enable all scenarios I could think of.
While there is a way to make non-variadic components able to behave as loop mergers, we would need to extend the API to allow the user to define alternative sets of mandatory inputs instead of a single one, as it happens now. This change can be performed in a (mostly) non-breaking way later on if we find it necessary, so it was not added in this PR.
Dynamic mandatory + fixed optional inputs
The current
PromptBuilder
shows a signature that this version would not readily support:The issue is that input sockets are read from the
set_input_types
call only, and therefore all count as mandatory. This is fixable in a non-breaking way, so it has not been included in this PR and will be addressed separately.Fixes #112
Fixes #113
Fixes #111
Fixes #105