Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make GraphProcessor trace optional #460

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/core/src/api/createProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export type RunGraphOptions = {
};
abortSignal?: AbortSignal;
registry?: NodeRegistration;
includeTrace?: boolean;
getChatNodeEndpoint?: ProcessContext['getChatNodeEndpoint'];
} & {
[P in keyof ProcessEvents as `on${PascalCase<P>}`]?: (params: ProcessEvents[P]) => void;
Expand Down Expand Up @@ -75,7 +76,7 @@ export function coreCreateProcessor(project: Project, options: RunGraphOptions)
throw new Error(`Graph not found, and no main graph specified.`);
}

const processor = new GraphProcessor(project, graphId as GraphId, options.registry);
const processor = new GraphProcessor(project, graphId as GraphId, options.registry, options.includeTrace);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be preferred for optional properties to be grouped into an options object so they can be specified in any order, but this class is exported out of core which technically makes it part of the public interface. While I think most users would use it through createGraphProcessor we can't update this without a breaking change, but we can at least note this for the future


if (options.onStart) {
processor.on('start', options.onStart);
Expand Down
118 changes: 82 additions & 36 deletions packages/core/src/model/GraphProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ export class GraphProcessor {
readonly #registry: NodeRegistration;
id = nanoid();

#includeTrace?: boolean = true;

executor?: 'nodejs' | 'browser';

/** If set, specifies the node(s) that the graph will run TO, instead of the nodes without any dependents. */
Expand Down Expand Up @@ -246,7 +248,7 @@ export class GraphProcessor {
return this.#running;
}

constructor(project: Project, graphId?: GraphId, registry?: NodeRegistration) {
constructor(project: Project, graphId?: GraphId, registry?: NodeRegistration, includeTrace?: boolean) {
this.#project = project;
const graph = graphId
? project.graphs[graphId]
Expand All @@ -259,6 +261,7 @@ export class GraphProcessor {
}
this.#graph = graph;

this.#includeTrace = includeTrace;
this.#nodeInstances = {};
this.#connections = {};
this.#nodesById = {};
Expand Down Expand Up @@ -747,7 +750,9 @@ export class GraphProcessor {
if (this.#hasPreloadedData) {
for (const node of this.#graph.nodes) {
if (this.#nodeResults.has(node.id)) {
this.#emitter.emit('trace', `Node ${node.title} has preloaded data`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Node ${node.title} has preloaded data`);
}

await this.#emitter.emit('nodeStart', {
node,
Expand Down Expand Up @@ -869,10 +874,12 @@ export class GraphProcessor {
if (!inputsReady) {
return;
}
this.#emitter.emit(
'trace',
`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`,
);
if(this.#includeTrace){
this.#emitter.emit(
'trace',
`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`,
);
}
Comment on lines +877 to +882
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we should consider adding a small util to wrap this so future calls to emitting trace don't miss the check


const attachedData = this.#getAttachedDataTo(node);

Expand All @@ -898,7 +905,9 @@ export class GraphProcessor {
this.#processingQueue.addAll(
inputNodes.map((inputNode) => {
return async () => {
this.#emitter.emit('trace', `Fetching required data for node ${inputNode.title} (${inputNode.id})`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Fetching required data for node ${inputNode.title} (${inputNode.id})`);
}
await this.#fetchNodeDataAndProcessNode(inputNode);
};
}),
Expand All @@ -912,32 +921,42 @@ export class GraphProcessor {
const builtInNode = node as BuiltInNodes;

if (this.#ignoreNodes.has(node.id)) {
this.#emitter.emit('trace', `Node ${node.title} is ignored`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Node ${node.title} is ignored`);
}
return;
}

if (this.runToNodeIds) {
const dependencyNodes = this.getDependencyNodesDeep(node.id);

if (this.runToNodeIds.some((runTo) => runTo != node.id && dependencyNodes.includes(runTo))) {
this.#emitter.emit('trace', `Node ${node.title} is excluded due to runToNodeIds`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Node ${node.title} is excluded due to runToNodeIds`);
}
return;
}
}

if (this.#currentlyProcessing.has(node.id)) {
this.#emitter.emit('trace', `Node ${node.title} is already being processed`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Node ${node.title} is already being processed`);
}
return;
}

// For a loop controller, it can run multiple times, otherwise we already processed this node so bail out
if (this.#visitedNodes.has(node.id) && node.type !== 'loopController') {
this.#emitter.emit('trace', `Node ${node.title} has already been processed`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Node ${node.title} has already been processed`);
}
return;
}

if (this.#erroredNodes.has(node.id)) {
this.#emitter.emit('trace', `Node ${node.title} has already errored`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Node ${node.title} has already errored`);
}
return;
}

Expand All @@ -946,7 +965,9 @@ export class GraphProcessor {
// Check if all input nodes are free of errors
for (const inputNode of inputNodes) {
if (this.#erroredNodes.has(inputNode.id)) {
this.#emitter.emit('trace', `Node ${node.title} has errored input node ${inputNode.title}`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Node ${node.title} has errored input node ${inputNode.title}`);
}
return;
}
}
Expand All @@ -959,18 +980,22 @@ export class GraphProcessor {
});

if (!inputsReady) {
await this.#emitter.emit(
'trace',
`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`,
);
if(this.#includeTrace){
await this.#emitter.emit(
'trace',
`Node ${node.title} has required inputs nodes: ${inputNodes.map((n) => n.title).join(', ')}`,
);
}
return;
}

// Excluded because control flow is still in a loop - difference between "will not execute" and "has not executed yet"
const inputValues = this.#getInputValuesForNode(node);

if (this.#excludedDueToControlFlow(node, inputValues, nanoid() as ProcessId, 'loop-not-broken')) {
this.#emitter.emit('trace', `Node ${node.title} is excluded due to control flow`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Node ${node.title} is excluded due to control flow`);
}
return;
}

Expand Down Expand Up @@ -1001,7 +1026,9 @@ export class GraphProcessor {
}

if (waitingForInputNode) {
this.#emitter.emit('trace', `Node ${node.title} is waiting for input node ${waitingForInputNode}`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Node ${node.title} is waiting for input node ${waitingForInputNode}`);
}
return;
}

Expand All @@ -1018,7 +1045,9 @@ export class GraphProcessor {
}

if (attachedData.races?.completed) {
this.#emitter.emit('trace', `Node ${node.title} is part of a race that was completed`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Node ${node.title} is part of a race that was completed`);
}
return;
}

Expand All @@ -1027,8 +1056,9 @@ export class GraphProcessor {
if (this.slowMode) {
await new Promise((resolve) => setTimeout(resolve, 250));
}

this.#emitter.emit('trace', `Finished processing node ${node.title} (${node.id})`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Finished processing node ${node.title} (${node.id})`);
}
this.#visitedNodes.add(node.id);
this.#currentlyProcessing.delete(node.id);
this.#remainingNodes.delete(node.id);
Expand All @@ -1046,10 +1076,14 @@ export class GraphProcessor {
this.#excludedDueToControlFlow(node, this.#getInputValuesForNode(node), nanoid() as ProcessId);

if (!didBreak) {
this.#emitter.emit('trace', `Loop controller ${node.title} did not break, so we're looping again`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Loop controller ${node.title} did not break, so we're looping again`);
}
for (const loopNodeId of attachedData.loopInfo?.nodes ?? []) {
const cycleNode = this.#nodesById[loopNodeId]!;
this.#emitter.emit('trace', `Clearing cycle node ${cycleNode.title} (${cycleNode.id})`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Clearing cycle node ${cycleNode.title} (${cycleNode.id})`);
}
this.#visitedNodes.delete(cycleNode.id);
this.#currentlyProcessing.delete(cycleNode.id);
this.#remainingNodes.add(cycleNode.id);
Expand All @@ -1067,7 +1101,9 @@ export class GraphProcessor {
for (const [nodeId] of allNodesForRace) {
for (const [key, abortController] of this.#nodeAbortControllers.entries()) {
if (key.startsWith(nodeId)) {
this.#emitter.emit('trace', `Aborting node ${nodeId} because other race branch won`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Aborting node ${nodeId} because other race branch won`);
}
abortController.abort();
}
}
Expand Down Expand Up @@ -1133,10 +1169,12 @@ export class GraphProcessor {
// Node is finished, check if we can run any more nodes that depend on this one
this.#processingQueue.addAll(
outputNodes.nodes.map((outputNode) => async () => {
this.#emitter.emit(
'trace',
`Trying to run output node from ${node.title}: ${outputNode.title} (${outputNode.id})`,
);
if(this.#includeTrace){
this.#emitter.emit(
'trace',
`Trying to run output node from ${node.title}: ${outputNode.title} (${outputNode.id})`,
);
}

await this.#processNodeIfAllInputsAvailable(outputNode);
}),
Expand Down Expand Up @@ -1385,7 +1423,9 @@ export class GraphProcessor {
#nodeErrored(node: ChartNode, e: unknown, processId: ProcessId) {
const error = getError(e);
this.#emitter.emit('nodeError', { node, error, processId });
this.#emitter.emit('trace', `Node ${node.title} (${node.id}-${processId}) errored: ${error.stack}`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Node ${node.title} (${node.id}-${processId}) errored: ${error.stack}`);
}
this.#erroredNodes.set(node.id, error.toString());
}

Expand Down Expand Up @@ -1545,7 +1585,9 @@ export class GraphProcessor {
return processor;
},
trace: (message) => {
this.#emitter.emit('trace', message);
if(this.#includeTrace){
this.#emitter.emit('trace', message);
}
},
abortGraph: (error) => {
this.abort(error === undefined, error);
Expand All @@ -1572,7 +1614,9 @@ export class GraphProcessor {
typeOfExclusion: ControlFlowExcludedDataValue['value'] = undefined,
) {
if (node.disabled) {
this.#emitter.emit('trace', `Excluding node ${node.title} because it's disabled`);
if(this.#includeTrace){
this.#emitter.emit('trace', `Excluding node ${node.title} because it's disabled`);
}

this.#visitedNodes.add(node.id);
this.#markAsExcluded(node, processId, inputValues, 'disabled');
Expand Down Expand Up @@ -1606,10 +1650,12 @@ export class GraphProcessor {
if (inputIsExcludedValue && !allowedToConsumedExcludedValue) {
if (!isWaitingForLoop) {
if (inputIsExcludedValue) {
this.#emitter.emit(
'trace',
`Excluding node ${node.title} because of control flow. Input is has excluded value: ${controlFlowExcludedValues[0]?.[0]}`,
);
if(this.#includeTrace){
this.#emitter.emit(
'trace',
`Excluding node ${node.title} because of control flow. Input is has excluded value: ${controlFlowExcludedValues[0]?.[0]}`,
);
}
}

this.#visitedNodes.add(node.id);
Expand Down
Loading