Skip to content

Commit

Permalink
fix llm stream subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
walterra committed Jan 30, 2024
1 parent 08bb76a commit c7fe139
Showing 1 changed file with 20 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { BufferMemory, ChatMessageHistory } from 'langchain/memory';
import { Tool } from '@langchain/core/tools';

import { streamFactory } from '@kbn/ml-response-stream/server';
import { RunLogPatch } from '@langchain/core/dist/tracers/log_stream';
import { ElasticsearchStore } from '../elasticsearch_store/elasticsearch_store';
import { ActionsClientChatOpenAI } from '../llm/openai';
import { ActionsClientLlm } from '../llm/actions_client_llm';
Expand Down Expand Up @@ -111,7 +110,7 @@ export const callAgentExecutor: AgentExecutor<true | false> = async ({
? await initializeAgentExecutorWithOptions(tools, llm, {
agentType: 'openai-functions',
memory,
verbose: false,
verbose: true,
})
: await initializeAgentExecutorWithOptions(tools, llm, {
agentType: 'chat-conversational-react-description',
Expand All @@ -125,10 +124,28 @@ export const callAgentExecutor: AgentExecutor<true | false> = async ({

let traceData;
if (isStream) {
const logStream = executor.streamLog(
const {
end: streamEnd,
push,
responseWithHeaders,
} = streamFactory<{ type: string; payload: string }>(request.headers, logger, false, false);

executor.stream(
{
input: latestMessage[0].content,
chat_history: [],
},
{
callbacks: [
{
handleLLMNewToken(payload) {
push({ payload, type: 'content' });
},
handleLLMEnd() {
streamEnd();
},
},
],
}
// TODO before merge to main
// uncomment
Expand All @@ -139,15 +156,6 @@ export const callAgentExecutor: AgentExecutor<true | false> = async ({
// }
);

const {
end: streamEnd,
push,
responseWithHeaders,
} = streamFactory<{ type: string; payload: string }>(request.headers, logger, false, false);

// Do not call this using `await` so it will run asynchronously while we return the stream in responseWithHeaders
readStream(logStream, push, streamEnd);
console.log('returning responseWithHeaders', responseWithHeaders);
// TODO before merge to main
// figure out how to pass trace_data and replacements @spong @macri @yuliia
return responseWithHeaders;
Expand Down Expand Up @@ -187,27 +195,3 @@ export const callAgentExecutor: AgentExecutor<true | false> = async ({
},
};
};
async function readStream(
logStream: AsyncGenerator<RunLogPatch>,
push: (arg0: { type: string; payload: string }) => void,
streamEnd: () => void
) {
push({ type: 'starting', payload: 'hello world' });
for await (const chunk of logStream) {
if (chunk.ops?.length > 0 && chunk.ops[0].op === 'add') {
const addOp = chunk.ops[0];
if (
addOp.path.startsWith('/logs/ActionsClientChatOpenAI') &&
typeof addOp.value === 'string' &&
addOp.value.length
) {
push({ type: 'content', payload: addOp.value });
await new Promise((resolve) => setTimeout(resolve, 0));
}
}
}

push({ type: 'after', payload: 'hello world' });
streamEnd();
push({ type: 'streamEnd', payload: 'hello world' });
}

0 comments on commit c7fe139

Please sign in to comment.