Skip to content

Commit

Permalink
Lunary: feedback tracking (#3332)
Browse files Browse the repository at this point in the history
* Lunary: feedback tracking

* fix incorrect param order
  • Loading branch information
vincelwt authored Oct 21, 2024
1 parent 2c6cf12 commit 4a9ffe7
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 36 deletions.
19 changes: 13 additions & 6 deletions packages/components/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,14 @@ class ExtendedLunaryHandler extends LunaryHandler {
databaseEntities: IDatabaseEntity
currentRunId: string | null
thread: any
apiMessageId: string

constructor({ flowiseOptions, ...options }: any) {
super(options)
this.appDataSource = flowiseOptions.appDataSource
this.databaseEntities = flowiseOptions.databaseEntities
this.chatId = flowiseOptions.chatId
this.apiMessageId = flowiseOptions.apiMessageId
}

async initThread() {
Expand All @@ -258,14 +260,18 @@ class ExtendedLunaryHandler extends LunaryHandler {
}
})

const userId = entity?.email ?? entity?.id

this.thread = lunary.openThread({
id: this.chatId,
userId: entity?.email ?? entity?.id,
userProps: {
name: entity?.name ?? undefined,
email: entity?.email ?? undefined,
phone: entity?.phone ?? undefined
}
userId,
userProps: userId
? {
name: entity?.name ?? undefined,
email: entity?.email ?? undefined,
phone: entity?.phone ?? undefined
}
: undefined
})
}

Expand Down Expand Up @@ -298,6 +304,7 @@ class ExtendedLunaryHandler extends LunaryHandler {
const answer = outputs.output

this.thread.trackMessage({
id: this.apiMessageId,
content: answer,
role: 'assistant'
})
Expand Down
3 changes: 2 additions & 1 deletion packages/server/src/controllers/openai-realtime/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ const executeAgentTool = async (req: Request, res: Response, next: NextFunction)
req.params.id,
req.body.chatId,
req.body.toolName,
req.body.inputArgs
req.body.inputArgs,
req.body.apiMessageId
)
return res.json(apiResponse)
} catch (error) {
Expand Down
14 changes: 11 additions & 3 deletions packages/server/src/services/openai-realtime/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { v4 as uuidv4 } from 'uuid'
const SOURCE_DOCUMENTS_PREFIX = '\n\n----FLOWISE_SOURCE_DOCUMENTS----\n\n'
const ARTIFACTS_PREFIX = '\n\n----FLOWISE_ARTIFACTS----\n\n'

const buildAndInitTool = async (chatflowid: string, _chatId?: string) => {
const buildAndInitTool = async (chatflowid: string, _chatId?: string, _apiMessageId?: string) => {
const appServer = getRunningExpressApp()
const chatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOneBy({
id: chatflowid
Expand All @@ -22,6 +22,7 @@ const buildAndInitTool = async (chatflowid: string, _chatId?: string) => {
}

const chatId = _chatId || uuidv4()
const apiMessageId = _apiMessageId || uuidv4()
const flowData = JSON.parse(chatflow.flowData)
const nodes = flowData.nodes
const edges = flowData.edges
Expand Down Expand Up @@ -62,6 +63,7 @@ const buildAndInitTool = async (chatflowid: string, _chatId?: string) => {
chatId: chatId,
sessionId: chatId,
chatflowid,
apiMessageId,
appDataSource: appServer.AppDataSource
})

Expand Down Expand Up @@ -113,9 +115,15 @@ const getAgentTools = async (chatflowid: string): Promise<any> => {
}
}

const executeAgentTool = async (chatflowid: string, chatId: string, toolName: string, inputArgs: string): Promise<any> => {
const executeAgentTool = async (
chatflowid: string,
chatId: string,
toolName: string,
inputArgs: string,
apiMessageId?: string
): Promise<any> => {
try {
const agent = await buildAndInitTool(chatflowid, chatId)
const agent = await buildAndInitTool(chatflowid, chatId, apiMessageId)
const tools = agent.tools
const tool = tools.find((tool: any) => tool.name === toolName)

Expand Down
2 changes: 2 additions & 0 deletions packages/server/src/utils/buildAgentGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import logger from './logger'
export const buildAgentGraph = async (
chatflow: IChatFlow,
chatId: string,
apiMessageId: string,
sessionId: string,
incomingInput: IncomingInput,
isInternal: boolean,
Expand Down Expand Up @@ -114,6 +115,7 @@ export const buildAgentGraph = async (
startingNodeIds,
reactFlowNodes: nodes,
reactFlowEdges: edges,
apiMessageId,
graph,
depthQueue,
componentNodes: appServer.nodesPool.componentNodes,
Expand Down
53 changes: 28 additions & 25 deletions packages/server/src/utils/buildChatflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges

const apiMessageId = uuidv4()

/*** Get session ID ***/
const memoryNode = findMemoryNode(nodes, edges)
const memoryType = memoryNode?.data.label
Expand All @@ -217,6 +219,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
chatflow,
isInternal,
chatId,
apiMessageId,
memoryType ?? '',
sessionId,
userMessageDateTime,
Expand Down Expand Up @@ -339,6 +342,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
reactFlowEdges: edges,
graph,
depthQueue,
apiMessageId,
componentNodes: appServer.nodesPool.componentNodes,
question: incomingInput.question,
chatHistory,
Expand Down Expand Up @@ -369,6 +373,7 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
chatflowid,
chatId,
sessionId,
apiMessageId,
chatHistory,
...incomingInput.overrideConfig
}
Expand All @@ -394,29 +399,23 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals

isStreamValid = (req.body.streaming === 'true' || req.body.streaming === true) && isStreamValid

let result = isStreamValid
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatId,
chatflowid,
logger,
appDataSource: appServer.AppDataSource,
databaseEntities,
analytic: chatflow.analytic,
uploads: incomingInput.uploads,
prependMessages,
sseStreamer: appServer.sseStreamer,
shouldStreamResponse: isStreamValid
})
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatId,
chatflowid,
logger,
appDataSource: appServer.AppDataSource,
databaseEntities,
analytic: chatflow.analytic,
uploads: incomingInput.uploads,
prependMessages
})
const runParams = {
chatId,
chatflowid,
apiMessageId,
logger,
appDataSource: appServer.AppDataSource,
databaseEntities,
analytic: chatflow.analytic,
uploads: incomingInput.uploads,
prependMessages
}

let result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
...runParams,
...(isStreamValid && { sseStreamer: appServer.sseStreamer, shouldStreamResponse: true })
})

result = typeof result === 'string' ? { text: result } : result

// Retrieve threadId from assistant if exists
Expand All @@ -443,7 +442,8 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
else if (result.json) resultText = '```json\n' + JSON.stringify(result.json, null, 2)
else resultText = JSON.stringify(result, null, 2)

const apiMessage: Omit<IChatMessage, 'id' | 'createdDate'> = {
const apiMessage: Omit<IChatMessage, 'createdDate'> = {
id: apiMessageId,
role: 'apiMessage',
content: resultText,
chatflowid,
Expand Down Expand Up @@ -507,6 +507,7 @@ const utilBuildAgentResponse = async (
agentflow: IChatFlow,
isInternal: boolean,
chatId: string,
apiMessageId: string,
memoryType: string,
sessionId: string,
userMessageDateTime: Date,
Expand All @@ -523,6 +524,7 @@ const utilBuildAgentResponse = async (
const streamResults = await buildAgentGraph(
agentflow,
chatId,
apiMessageId,
sessionId,
incomingInput,
isInternal,
Expand All @@ -546,7 +548,8 @@ const utilBuildAgentResponse = async (
}
await utilAddChatMessage(userMessage)

const apiMessage: Omit<IChatMessage, 'id' | 'createdDate'> = {
const apiMessage: Omit<IChatMessage, 'createdDate'> = {
id: apiMessageId,
role: 'apiMessage',
content: finalResult,
chatflowid: agentflow.id,
Expand Down
3 changes: 3 additions & 0 deletions packages/server/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ type BuildFlowParams = {
chatId: string
sessionId: string
chatflowid: string
apiMessageId: string
appDataSource: DataSource
overrideConfig?: ICommonObject
cachePool?: CachePool
Expand All @@ -452,6 +453,7 @@ export const buildFlow = async ({
componentNodes,
question,
chatHistory,
apiMessageId,
chatId,
sessionId,
chatflowid,
Expand Down Expand Up @@ -524,6 +526,7 @@ export const buildFlow = async ({
sessionId,
chatflowid,
chatHistory,
apiMessageId,
logger,
appDataSource,
databaseEntities,
Expand Down
17 changes: 17 additions & 0 deletions packages/server/src/utils/updateChatMessageFeedback.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { IChatMessageFeedback } from '../Interface'
import { getRunningExpressApp } from '../utils/getRunningExpressApp'
import { ChatMessageFeedback } from '../database/entities/ChatMessageFeedback'
import { ChatFlow } from '../database/entities/ChatFlow'
import lunary from 'lunary'

/**
* Method that updates chat message feedback.
Expand All @@ -11,6 +13,21 @@ export const utilUpdateChatMessageFeedback = async (id: string, chatMessageFeedb
const appServer = getRunningExpressApp()
const newChatMessageFeedback = new ChatMessageFeedback()
Object.assign(newChatMessageFeedback, chatMessageFeedback)

await appServer.AppDataSource.getRepository(ChatMessageFeedback).update({ id }, chatMessageFeedback)

// Fetch the updated entity
const updatedFeedback = await appServer.AppDataSource.getRepository(ChatMessageFeedback).findOne({ where: { id } })

const chatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOne({ where: { id: updatedFeedback?.chatflowid } })
const analytic = JSON.parse(chatflow?.analytic ?? '{}')

if (analytic?.lunary?.status === true && updatedFeedback?.rating) {
lunary.trackFeedback(updatedFeedback.messageId, {
comment: updatedFeedback?.content,
thumb: updatedFeedback?.rating === 'THUMBS_UP' ? 'up' : 'down'
})
}

return { status: 'OK' }
}
5 changes: 4 additions & 1 deletion packages/server/src/utils/upsertVector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { UpsertHistory } from '../database/entities/UpsertHistory'
import { InternalFlowiseError } from '../errors/internalFlowiseError'
import { StatusCodes } from 'http-status-codes'
import { getErrorMessage } from '../errors/utils'

import { v4 as uuidv4 } from 'uuid'
/**
* Upsert documents
* @param {Request} req
Expand Down Expand Up @@ -108,6 +108,8 @@ export const upsertVector = async (req: Request, isInternal: boolean = false) =>
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges

const apiMessageId = req.body.apiMessageId ?? uuidv4()

let stopNodeId = incomingInput?.stopNodeId ?? ''
let chatHistory: IMessage[] = []
let chatId = incomingInput.chatId ?? ''
Expand Down Expand Up @@ -162,6 +164,7 @@ export const upsertVector = async (req: Request, isInternal: boolean = false) =>
question: incomingInput.question,
chatHistory,
chatId,
apiMessageId,
sessionId: sessionId ?? '',
chatflowid,
appDataSource: appServer.AppDataSource,
Expand Down

0 comments on commit 4a9ffe7

Please sign in to comment.