diff --git a/app/generative_ui/ai/message.tsx b/app/generative_ui/ai/message.tsx new file mode 100644 index 0000000..1b4c911 --- /dev/null +++ b/app/generative_ui/ai/message.tsx @@ -0,0 +1,13 @@ +"use client"; + +import { StreamableValue, useStreamableValue } from "ai/rsc"; + +export function AIMessage(props: { value: StreamableValue }) { + const [data] = useStreamableValue(props.value); + + return ( +
+ {data} +
+ ); +} diff --git a/app/generative_ui/page.tsx b/app/generative_ui/page.tsx index 9152fee..03d2e8a 100644 --- a/app/generative_ui/page.tsx +++ b/app/generative_ui/page.tsx @@ -4,7 +4,6 @@ import { useState } from "react"; import type { EndpointsContext } from "./agent"; import { useActions } from "./utils/client"; import { LocalContext } from "./shared"; -import { readStreamableValue } from "ai/rsc"; export default function GenerativeUIPage() { const actions = useActions(); @@ -35,16 +34,12 @@ export default function GenerativeUIPage() { // consume the value stream to obtain the final string value // after which we can append to our chat history state (async () => { - let finalValue: string | null = null; - for await (const value of readStreamableValue(element.value)) { - finalValue = value; - } - - if (finalValue != null) { + let lastEvent = await element.lastEvent; + if (typeof lastEvent === "string") { setHistory((prev) => [ ...prev, ["user", input], - ["assistant", finalValue as string], + ["assistant", lastEvent], ]); } })(); diff --git a/app/generative_ui/utils/server.tsx b/app/generative_ui/utils/server.tsx index 2774097..f4356cf 100644 --- a/app/generative_ui/utils/server.tsx +++ b/app/generative_ui/utils/server.tsx @@ -15,6 +15,7 @@ import { StreamEvent, } from "@langchain/core/tracers/log_stream"; import { AIProvider } from "./client"; +import { AIMessage } from "../ai/message"; /** * Executes `streamEvents` method on a runnable @@ -28,14 +29,14 @@ export function streamRunnableUI( inputs: RunInput, ) { const ui = createStreamableUI(); - const value = createStreamableValue(); + const [lastEvent, resolve] = withResolvers(); (async () => { - let lastEvent: StreamEvent | null = null; + let lastEventValue: StreamEvent | null = null; - const streamableMap: Record< + const callbacks: Record< string, - ReturnType + ReturnType > = {}; for await (const streamEvent of runnable.streamEvents(inputs, { @@ -48,34 +49,30 @@ export function streamRunnableUI( if (isValidElement(chunk)) { ui.append(chunk); } else if ("text" in chunk && typeof chunk.text === "string") { - if (!streamableMap[streamEvent.run_id]) { - streamableMap[streamEvent.run_id] = createStreamableUI(); - const value = streamableMap[streamEvent.run_id].value; - - // create an AI message - ui.append( -
- {value} -
, - ); + if (!callbacks[streamEvent.run_id]) { + // the createStreamableValue / useStreamableValue is preferred + // as the stream events are updated immediately in the UI + // rather than being batched by React via createStreamableUI + const textStream = createStreamableValue(); + ui.append(); + callbacks[streamEvent.run_id] = textStream; } - streamableMap[streamEvent.run_id].append(chunk.text); + callbacks[streamEvent.run_id].append(chunk.text); } } - lastEvent = streamEvent; + lastEventValue = streamEvent; } - value.done(lastEvent?.data.output); + // resolve the promise, which will be sent + // to the client thanks to RSC + resolve(lastEventValue?.data.output); - for (const ui of Object.values(streamableMap)) ui.done(); + Object.values(callbacks).forEach((cb) => cb.done()); ui.done(); })(); - return { - ui: ui.value, - value: value.value, - }; + return { ui: ui.value, lastEvent }; } /** @@ -147,3 +144,19 @@ export function exposeEndpoints>( return {props.children}; }; } + +/** + * Polyfill to emulate the upcoming Promise.withResolvers + */ +export function withResolvers() { + let resolve: (value: T) => void; + let reject: (reason?: any) => void; + + const innerPromise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + + // @ts-expect-error + return [innerPromise, resolve, reject] as const; +}