Skip to content

Commit

Permalink
feat(debug): add checkpoint debug events (#323)
Browse files Browse the repository at this point in the history
* feat(debug): add checkpoint debug events

* fix(algo): only return channels which actually triggered the task (#324)

* Match checkpoint events with Python

* Revert triggers PR
  • Loading branch information
dqbd authored Aug 21, 2024
1 parent aaed10c commit 7444211
Show file tree
Hide file tree
Showing 7 changed files with 478 additions and 78 deletions.
78 changes: 39 additions & 39 deletions langgraph/src/pregel/algo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,26 +360,27 @@ export function _prepareNextTasks<
);
continue;
}
const triggers = [TASKS];
const metadata = {
langgraph_step: step,
langgraph_node: packet.node,
langgraph_triggers: triggers,
langgraph_task_idx: tasks.length,
};
const checkpointNamespace =
parentNamespace === ""
? packet.node
: `${parentNamespace}${CHECKPOINT_NAMESPACE_SEPARATOR}${packet.node}`;
const taskId = uuid5(
JSON.stringify([checkpointNamespace, metadata]),
checkpoint.id
);

if (forExecution) {
const proc = processes[packet.node];
const node = proc.getNode();
if (node !== undefined) {
const triggers = [TASKS];
const metadata = {
langgraph_step: step,
langgraph_node: packet.node,
langgraph_triggers: triggers,
langgraph_task_idx: tasks.length,
};
const writes: [keyof Cc, unknown][] = [];
const checkpointNamespace =
parentNamespace === ""
? packet.node
: `${parentNamespace}${CHECKPOINT_NAMESPACE_SEPARATOR}${packet.node}`;
const taskId = uuid5(
JSON.stringify([checkpointNamespace, metadata]),
checkpoint.id
);
tasks.push({
name: packet.node,
input: packet.args,
Expand Down Expand Up @@ -417,17 +418,14 @@ export function _prepareNextTasks<
});
}
} else {
taskDescriptions.push({
name: packet.node,
input: packet.args,
});
taskDescriptions.push({ id: taskId, name: packet.node });
}
}

// Check if any processes should be run in next step
// If so, prepare the values to be passed to them
for (const [name, proc] of Object.entries<PregelNode>(processes)) {
const hasUpdatedChannels = proc.triggers
const updatedChannels = proc.triggers
.filter((chan) => {
try {
readChannel(channels, chan, false);
Expand All @@ -436,11 +434,13 @@ export function _prepareNextTasks<
return false;
}
})
.some(
.filter(
(chan) =>
getChannelVersion(newCheckpoint, chan) >
getVersionSeen(newCheckpoint, name, chan)
);

const hasUpdatedChannels = updatedChannels.length > 0;
// If any of the channels read by this process were updated
if (hasUpdatedChannels) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -493,6 +493,23 @@ export function _prepareNextTasks<
val = proc.mapper(val);
}

const metadata = {
langgraph_step: step,
langgraph_node: name,
langgraph_triggers: proc.triggers,
langgraph_task_idx: tasks.length,
};

const checkpointNamespace =
parentNamespace === ""
? name
: `${parentNamespace}${CHECKPOINT_NAMESPACE_SEPARATOR}${name}`;

const taskId = uuid5(
JSON.stringify([checkpointNamespace, metadata]),
checkpoint.id
);

if (forExecution) {
// Update seen versions
if (!newCheckpoint.versions_seen[name]) {
Expand All @@ -508,20 +525,6 @@ export function _prepareNextTasks<

const node = proc.getNode();
if (node !== undefined) {
const metadata = {
langgraph_step: step,
langgraph_node: name,
langgraph_triggers: proc.triggers,
langgraph_task_idx: tasks.length,
};
const checkpointNamespace =
parentNamespace === ""
? name
: `${parentNamespace}${CHECKPOINT_NAMESPACE_SEPARATOR}${name}`;
const taskId = uuid5(
JSON.stringify([checkpointNamespace, metadata]),
checkpoint.id
);
const writes: [keyof Cc, unknown][] = [];
tasks.push({
name,
Expand Down Expand Up @@ -562,10 +565,7 @@ export function _prepareNextTasks<
});
}
} else {
taskDescriptions.push({
name,
input: val,
});
taskDescriptions.push({ id: taskId, name });
}
}
}
Expand Down
91 changes: 79 additions & 12 deletions langgraph/src/pregel/debug.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { RunnableConfig } from "@langchain/core/runnables";
import { BaseChannel } from "../channels/base.js";
import { CheckpointMetadata } from "../checkpoint/types.js";
import {
CheckpointMetadata,
CheckpointPendingWrite,
PendingWrite,
} from "../checkpoint/types.js";
import { uuid5 } from "../checkpoint/id.js";
import { TAG_HIDDEN, TASK_NAMESPACE } from "../constants.js";
import { ERROR, TAG_HIDDEN, TASK_NAMESPACE } from "../constants.js";
import { EmptyChannelError } from "../errors.js";
import { PregelExecutableTask } from "./types.js";
import { PregelExecutableTask, PregelTaskDescription } from "./types.js";
import { readChannels } from "./io.js";

type ConsoleColors = {
Expand Down Expand Up @@ -82,14 +86,19 @@ export function* mapDebugTasks<N extends PropertyKey, C extends PropertyKey>(
if (config?.tags?.includes(TAG_HIDDEN)) continue;

const metadata = { ...config?.metadata };
delete metadata.checkpoint_id;
const idMetadata = {
langgraph_step: metadata.langgraph_step,
langgraph_node: metadata.langgraph_node,
langgraph_triggers: metadata.langgraph_triggers,
langgraph_task_idx: metadata.langgraph_task_idx,
};

yield {
type: "task",
timestamp: ts,
step,
payload: {
id: uuid5(JSON.stringify([name, step, metadata]), TASK_NAMESPACE),
id: uuid5(JSON.stringify([name, step, idMetadata]), TASK_NAMESPACE),
name,
input,
triggers,
Expand All @@ -111,14 +120,19 @@ export function* mapDebugTaskResults<
if (config?.tags?.includes(TAG_HIDDEN)) continue;

const metadata = { ...config?.metadata };
delete metadata.checkpoint_id;
const idMetadata = {
langgraph_step: metadata.langgraph_step,
langgraph_node: metadata.langgraph_node,
langgraph_triggers: metadata.langgraph_triggers,
langgraph_task_idx: metadata.langgraph_task_idx,
};

yield {
type: "task_result",
timestamp: ts,
step,
payload: {
id: uuid5(JSON.stringify([name, step, metadata]), TASK_NAMESPACE),
id: uuid5(JSON.stringify([name, step, idMetadata]), TASK_NAMESPACE),
name,
result: writes.filter(([channel]) =>
streamChannelsList.includes(channel)
Expand All @@ -128,13 +142,51 @@ export function* mapDebugTaskResults<
}
}

export function* mapDebugCheckpoint(
export function* mapDebugCheckpoint<
N extends PropertyKey,
C extends PropertyKey
>(
step: number,
config: RunnableConfig,
channels: Record<string, BaseChannel>,
streamChannels: string | string[],
metadata: CheckpointMetadata
metadata: CheckpointMetadata,
tasks: readonly PregelExecutableTask<N, C>[],
pendingWrites: CheckpointPendingWrite[]
) {
function formatConfig(config: RunnableConfig) {
// https://stackoverflow.com/a/78298178
type CamelToSnake<
T extends string,
A extends string = ""
> = T extends `${infer F}${infer R}`
? CamelToSnake<
R,
`${A}${F extends Lowercase<F> ? F : `_${Lowercase<F>}`}`
>
: A;

// make sure the config is consistent with Python
const pyConfig: Partial<
Record<CamelToSnake<keyof RunnableConfig>, unknown>
> = {};

if (config.callbacks != null) pyConfig.callbacks = config.callbacks;
if (config.configurable != null)
pyConfig.configurable = config.configurable;
if (config.maxConcurrency != null)
pyConfig.max_concurrency = config.maxConcurrency;

if (config.metadata != null) pyConfig.metadata = config.metadata;
if (config.recursionLimit != null)
pyConfig.recursion_limit = config.recursionLimit;
if (config.runId != null) pyConfig.run_id = config.runId;
if (config.runName != null) pyConfig.run_name = config.runName;
if (config.tags != null) pyConfig.tags = config.tags;

return pyConfig;
}

function getCurrentUTC() {
const now = new Date();
return new Date(now.getTime() - now.getTimezoneOffset() * 60 * 1000);
Expand All @@ -146,13 +198,29 @@ export function* mapDebugCheckpoint(
timestamp: ts,
step,
payload: {
config,
config: formatConfig(config),
values: readChannels(channels, streamChannels),
metadata,
next: tasks.map((task) => task.name),
tasks: tasksWithWrites(tasks, pendingWrites),
},
};
}

function tasksWithWrites<N extends PropertyKey, C extends PropertyKey>(
tasks: readonly PregelExecutableTask<N, C>[],
pendingWrites: CheckpointPendingWrite[]
): PregelTaskDescription[] {
return tasks.map((task): PregelTaskDescription => {
const error = pendingWrites.find(
([id, n]) => id === task.id && n === ERROR
)?.[2];

if (error) return { id: task.id, name: task.name as string, error };
return { id: task.id, name: task.name as string };
});
}

export function printStepCheckpoint(
step: number,
channels: Record<string, BaseChannel<unknown>>,
Expand Down Expand Up @@ -194,8 +262,7 @@ export function printStepTasks<N extends PropertyKey, C extends PropertyKey>(

export function printStepWrites(
step: number,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
writes: Array<[string, any]>,
writes: PendingWrite[],
whitelist: string[]
): void {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down
2 changes: 1 addition & 1 deletion langgraph/src/pregel/io.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BaseChannel } from "../channels/base.js";
import type { BaseChannel } from "../channels/base.js";
import type { PregelExecutableTask } from "./types.js";
import type { PendingWrite } from "../checkpoint/types.js";
import { TAG_HIDDEN } from "../constants.js";
Expand Down
36 changes: 21 additions & 15 deletions langgraph/src/pregel/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,27 @@ export class PregelLoop {
}
);
this.tasks = nextTasks;

// Produce debug output
if (this.checkpointer) {
this.stream.push(
...(await gatherIterator(
prefixGenerator(
mapDebugCheckpoint(
this.step - 1, // printing checkpoint for previous step
this.checkpointConfig,
this.channels,
this.graph.streamChannelsAsIs as string[],
this.checkpointMetadata,
this.tasks,
this.checkpointPendingWrites
),
"debug"
)
))
);
}

if (this.tasks.length === 0) {
this.status = "done";
return false;
Expand Down Expand Up @@ -489,21 +510,6 @@ export class PregelLoop {
checkpoint_id: this.checkpoint.id,
},
};

// Produce debug output
const debugOutput = await gatherIterator(
prefixGenerator(
mapDebugCheckpoint(
this.step,
this.checkpointConfig,
this.channels,
this.graph.streamChannelsAsIs as string[],
this.checkpointMetadata
),
"debug"
)
);
this.stream.push(...debugOutput);
}
this.step += 1;
}
Expand Down
3 changes: 2 additions & 1 deletion langgraph/src/pregel/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ export type PregelParams<
> = Omit<PregelInterface<Nn, Cc>, "streamChannelsAsIs">;

export interface PregelTaskDescription {
readonly id: string;
readonly name: string;
readonly input: unknown;
readonly error?: unknown;
}

export interface PregelExecutableTask<
Expand Down
Loading

0 comments on commit 7444211

Please sign in to comment.