diff --git a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts index e01d85b5b0a..59f640c6a30 100644 --- a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts +++ b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts @@ -2,6 +2,10 @@ import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Inter import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { ListKeyOptions, RecordManagerInterface, UpdateOptions } from '@langchain/community/indexes/base' import { DataSource, QueryRunner } from 'typeorm' +import { getHost } from '../../vectorstores/Postgres/utils' +import { getDatabase, getPort, getTableName } from './utils' + +const serverCredentialsExists = !!process.env.POSTGRES_RECORDMANAGER_USER && !!process.env.POSTGRES_RECORDMANAGER_PASSWORD class PostgresRecordManager_RecordManager implements INode { label: string @@ -29,18 +33,22 @@ class PostgresRecordManager_RecordManager implements INode { { label: 'Host', name: 'host', - type: 'string' + type: 'string', + placeholder: getHost(), + optional: !!getHost() }, { label: 'Database', name: 'database', - type: 'string' + type: 'string', + placeholder: getDatabase(), + optional: !!getDatabase() }, { label: 'Port', name: 'port', type: 'number', - placeholder: '5432', + placeholder: getPort(), optional: true }, { @@ -54,7 +62,7 @@ class PostgresRecordManager_RecordManager implements INode { label: 'Table Name', name: 'tableName', type: 'string', - placeholder: 'upsertion_records', + placeholder: getTableName(), additionalParams: true, optional: true }, @@ -110,16 +118,16 @@ class PostgresRecordManager_RecordManager implements INode { label: 'Connect Credential', name: 'credential', type: 'credential', - credentialNames: ['PostgresApi'] + credentialNames: ['PostgresApi'], + optional: serverCredentialsExists } } async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { const credentialData = await getCredentialData(nodeData.credential ?? '', options) - const user = getCredentialParam('user', credentialData, nodeData) - const password = getCredentialParam('password', credentialData, nodeData) - const _tableName = nodeData.inputs?.tableName as string - const tableName = _tableName ? _tableName : 'upsertion_records' + const user = getCredentialParam('user', credentialData, nodeData, process.env.POSTGRES_RECORDMANAGER_USER) + const password = getCredentialParam('password', credentialData, nodeData, process.env.POSTGRES_RECORDMANAGER_PASSWORD) + const tableName = getTableName(nodeData) const additionalConfig = nodeData.inputs?.additionalConfig as string const _namespace = nodeData.inputs?.namespace as string const namespace = _namespace ? _namespace : options.chatflowid @@ -139,11 +147,11 @@ class PostgresRecordManager_RecordManager implements INode { const postgresConnectionOptions = { ...additionalConfiguration, type: 'postgres', - host: nodeData.inputs?.host as string, - port: nodeData.inputs?.port as number, + host: getHost(nodeData), + port: getPort(nodeData), username: user, password: password, - database: nodeData.inputs?.database as string + database: getDatabase(nodeData) } const args = { @@ -162,7 +170,7 @@ class PostgresRecordManager_RecordManager implements INode { type PostgresRecordManagerOptions = { postgresConnectionOptions: any - tableName?: string + tableName: string } class PostgresRecordManager implements RecordManagerInterface { @@ -180,7 +188,7 @@ class PostgresRecordManager implements RecordManagerInterface { const { postgresConnectionOptions, tableName } = config this.namespace = namespace this.datasource = new DataSource(postgresConnectionOptions) - this.tableName = tableName || 'upsertion_records' + this.tableName = tableName } async createSchema(): Promise { diff --git a/packages/components/nodes/recordmanager/PostgresRecordManager/README.md b/packages/components/nodes/recordmanager/PostgresRecordManager/README.md new file mode 100644 index 00000000000..b2a73ded6bf --- /dev/null +++ b/packages/components/nodes/recordmanager/PostgresRecordManager/README.md @@ -0,0 +1,18 @@ +# Postgres Record Manager + +Postgres Record Manager integration for Flowise + +## 🌱 Env Variables + +| Variable | Description | Type | Default | +| ---------------------------- | ----------------------------------------------------------------------------------------------- | ------------------------------------------------ | ----------------------------------- | +| POSTGRES_RECORDMANAGER_HOST | Default `host` for Postgres Record Manager | String | | +| POSTGRES_RECORDMANAGER_PORT | Default `port` for Postgres Record Manager | Number | 5432 | +| POSTGRES_RECORDMANAGER_USER | Default `user` for Postgres Record Manager | String | | +| POSTGRES_RECORDMANAGER_PASSWORD | Default `password` for Postgres Record Manager | String | | +| POSTGRES_RECORDMANAGER_DATABASE | Default `database` for Postgres Record Manager | String | | +| POSTGRES_RECORDMANAGER_TABLE_NAME | Default `tableName` for Postgres Record Manager | String | upsertion_records | + +## License + +Source code in this repository is made available under the [Apache License Version 2.0](https://github.com/FlowiseAI/Flowise/blob/master/LICENSE.md). \ No newline at end of file diff --git a/packages/components/nodes/recordmanager/PostgresRecordManager/utils.ts b/packages/components/nodes/recordmanager/PostgresRecordManager/utils.ts new file mode 100644 index 00000000000..f9a8d9ae06e --- /dev/null +++ b/packages/components/nodes/recordmanager/PostgresRecordManager/utils.ts @@ -0,0 +1,17 @@ +import { defaultChain, INodeData } from '../../../src' + +export function getHost(nodeData?: INodeData) { + return defaultChain(nodeData?.inputs?.host, process.env.POSTGRES_RECORDMANAGER_HOST) +} + +export function getDatabase(nodeData?: INodeData) { + return defaultChain(nodeData?.inputs?.database, process.env.POSTGRES_RECORDMANAGER_DATABASE) +} + +export function getPort(nodeData?: INodeData) { + return defaultChain(nodeData?.inputs?.port, process.env.POSTGRES_RECORDMANAGER_PORT, '5432') +} + +export function getTableName(nodeData?: INodeData) { + return defaultChain(nodeData?.inputs?.tableName, process.env.POSTGRES_RECORDMANAGER_TABLE_NAME, 'upsertion_records') +} diff --git a/packages/components/nodes/vectorstores/Postgres/Postgres.ts b/packages/components/nodes/vectorstores/Postgres/Postgres.ts index d5cdf069fda..1cdacb85fc7 100644 --- a/packages/components/nodes/vectorstores/Postgres/Postgres.ts +++ b/packages/components/nodes/vectorstores/Postgres/Postgres.ts @@ -1,13 +1,16 @@ -import { Pool } from 'pg' import { flatten } from 'lodash' -import { DataSourceOptions } from 'typeorm' -import { Embeddings } from '@langchain/core/embeddings' import { Document } from '@langchain/core/documents' -import { TypeORMVectorStore, TypeORMVectorStoreDocument } from '@langchain/community/vectorstores/typeorm' import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams, IndexingResult } from '../../../src/Interface' -import { FLOWISE_CHATID, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' +import { FLOWISE_CHATID, getBaseClasses } from '../../../src/utils' import { index } from '../../../src/indexing' import { howToUseFileUpload } from '../VectorStoreUtils' +import { VectorStore } from '@langchain/core/vectorstores' +import { VectorStoreDriver } from './driver/Base' +import { TypeORMDriver } from './driver/TypeORM' +import { PGVectorDriver } from './driver/PGVector' +import { getContentColumnName, getDatabase, getHost, getPort, getTableName } from './utils' + +const serverCredentialsExists = !!process.env.POSTGRES_VECTORSTORE_USER && !!process.env.POSTGRES_VECTORSTORE_PASSWORD class Postgres_VectorStores implements INode { label: string @@ -26,7 +29,7 @@ class Postgres_VectorStores implements INode { constructor() { this.label = 'Postgres' this.name = 'postgres' - this.version = 6.0 + this.version = 7.0 this.type = 'Postgres' this.icon = 'postgres.svg' this.category = 'Vector Stores' @@ -36,7 +39,8 @@ class Postgres_VectorStores implements INode { label: 'Connect Credential', name: 'credential', type: 'credential', - credentialNames: ['PostgresApi'] + credentialNames: ['PostgresApi'], + optional: serverCredentialsExists } this.inputs = [ { @@ -61,28 +65,74 @@ class Postgres_VectorStores implements INode { { label: 'Host', name: 'host', - type: 'string' + type: 'string', + placeholder: getHost(), + optional: !!getHost() }, { label: 'Database', name: 'database', - type: 'string' + type: 'string', + placeholder: getDatabase(), + optional: !!getDatabase() }, { label: 'Port', name: 'port', type: 'number', - placeholder: '5432', + placeholder: getPort(), optional: true }, { label: 'Table Name', name: 'tableName', type: 'string', - placeholder: 'documents', + placeholder: getTableName(), additionalParams: true, optional: true }, + { + label: 'Driver', + name: 'driver', + type: 'options', + default: 'typeorm', + description: 'Different option to connect to Postgres', + options: [ + { + label: 'TypeORM', + name: 'typeorm' + }, + { + label: 'PGVector', + name: 'pgvector' + } + ], + optional: true, + additionalParams: true + }, + { + label: 'Distance Strategy', + name: 'distanceStrategy', + description: 'Strategy for calculating distances between vectors', + type: 'options', + options: [ + { + label: 'Cosine', + name: 'cosine' + }, + { + label: 'Euclidean', + name: 'euclidean' + }, + { + label: 'Inner Product', + name: 'innerProduct' + } + ], + additionalParams: true, + default: 'cosine', + optional: true + }, { label: 'File Upload', name: 'fileUpload', @@ -117,6 +167,15 @@ class Postgres_VectorStores implements INode { type: 'json', additionalParams: true, optional: true + }, + { + label: 'Content Column Name', + name: 'contentColumnName', + description: 'Column name to store the text content (PGVector Driver only, others use pageContent)', + type: 'string', + placeholder: getContentColumnName(), + additionalParams: true, + optional: true } ] this.outputs = [ @@ -128,7 +187,7 @@ class Postgres_VectorStores implements INode { { label: 'Postgres Vector Store', name: 'vectorStore', - baseClasses: [this.type, ...getBaseClasses(TypeORMVectorStore)] + baseClasses: [this.type, ...getBaseClasses(VectorStore)] } ] } @@ -136,43 +195,15 @@ class Postgres_VectorStores implements INode { //@ts-ignore vectorStoreMethods = { async upsert(nodeData: INodeData, options: ICommonObject): Promise> { - const credentialData = await getCredentialData(nodeData.credential ?? '', options) - const user = getCredentialParam('user', credentialData, nodeData) - const password = getCredentialParam('password', credentialData, nodeData) - const _tableName = nodeData.inputs?.tableName as string - const tableName = _tableName ? _tableName : 'documents' + const tableName = getTableName(nodeData) const docs = nodeData.inputs?.document as Document[] - const embeddings = nodeData.inputs?.embeddings as Embeddings - const additionalConfig = nodeData.inputs?.additionalConfig as string const recordManager = nodeData.inputs?.recordManager const isFileUploadEnabled = nodeData.inputs?.fileUpload as boolean - - let additionalConfiguration = {} - if (additionalConfig) { - try { - additionalConfiguration = typeof additionalConfig === 'object' ? additionalConfig : JSON.parse(additionalConfig) - } catch (exception) { - throw new Error('Invalid JSON in the Additional Configuration: ' + exception) - } - } - - const postgresConnectionOptions = { - ...additionalConfiguration, - type: 'postgres', - host: nodeData.inputs?.host as string, - port: nodeData.inputs?.port as number, - username: user, - password: password, - database: nodeData.inputs?.database as string - } - - const args = { - postgresConnectionOptions: postgresConnectionOptions as DataSourceOptions, - tableName: tableName - } + const vectorStoreDriver: VectorStoreDriver = Postgres_VectorStores.getDriverFromConfig(nodeData, options) const flattenDocs = docs && docs.length ? flatten(docs) : [] const finalDocs = [] + for (let i = 0; i < flattenDocs.length; i += 1) { if (flattenDocs[i] && flattenDocs[i].pageContent) { if (isFileUploadEnabled && options.chatId) { @@ -184,24 +215,7 @@ class Postgres_VectorStores implements INode { try { if (recordManager) { - const vectorStore = await TypeORMVectorStore.fromDataSource(embeddings, args) - - // Avoid Illegal invocation error - vectorStore.similaritySearchVectorWithScore = async (query: number[], k: number, filter?: any) => { - return await similaritySearchVectorWithScore(query, k, tableName, postgresConnectionOptions, filter) - } - - vectorStore.delete = async (params: { ids: string[] }): Promise => { - const { ids } = params - - if (ids?.length) { - try { - vectorStore.appDataSource.getRepository(vectorStore.documentEntity).delete(ids) - } catch (e) { - console.error('Failed to delete') - } - } - } + const vectorStore = await vectorStoreDriver.instanciate() await recordManager.createSchema() @@ -218,12 +232,7 @@ class Postgres_VectorStores implements INode { return res } else { - const vectorStore = await TypeORMVectorStore.fromDocuments(finalDocs, embeddings, args) - - // Avoid Illegal invocation error - vectorStore.similaritySearchVectorWithScore = async (query: number[], k: number, filter?: any) => { - return await similaritySearchVectorWithScore(query, k, tableName, postgresConnectionOptions, filter) - } + await vectorStoreDriver.fromDocuments(finalDocs) return { numAdded: finalDocs.length, addedDocs: finalDocs } } @@ -232,40 +241,11 @@ class Postgres_VectorStores implements INode { } }, async delete(nodeData: INodeData, ids: string[], options: ICommonObject): Promise { - const credentialData = await getCredentialData(nodeData.credential ?? '', options) - const user = getCredentialParam('user', credentialData, nodeData) - const password = getCredentialParam('password', credentialData, nodeData) - const _tableName = nodeData.inputs?.tableName as string - const tableName = _tableName ? _tableName : 'documents' - const embeddings = nodeData.inputs?.embeddings as Embeddings - const additionalConfig = nodeData.inputs?.additionalConfig as string + const vectorStoreDriver: VectorStoreDriver = Postgres_VectorStores.getDriverFromConfig(nodeData, options) + const tableName = getTableName(nodeData) const recordManager = nodeData.inputs?.recordManager - let additionalConfiguration = {} - if (additionalConfig) { - try { - additionalConfiguration = typeof additionalConfig === 'object' ? additionalConfig : JSON.parse(additionalConfig) - } catch (exception) { - throw new Error('Invalid JSON in the Additional Configuration: ' + exception) - } - } - - const postgresConnectionOptions = { - ...additionalConfiguration, - type: 'postgres', - host: nodeData.inputs?.host as string, - port: nodeData.inputs?.port as number, - username: user, - password: password, - database: nodeData.inputs?.database as string - } - - const args = { - postgresConnectionOptions: postgresConnectionOptions as DataSourceOptions, - tableName: tableName - } - - const vectorStore = await TypeORMVectorStore.fromDataSource(embeddings, args) + const vectorStore = await vectorStoreDriver.instanciate() try { if (recordManager) { @@ -286,13 +266,7 @@ class Postgres_VectorStores implements INode { } async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { - const credentialData = await getCredentialData(nodeData.credential ?? '', options) - const user = getCredentialParam('user', credentialData, nodeData) - const password = getCredentialParam('password', credentialData, nodeData) - const _tableName = nodeData.inputs?.tableName as string - const tableName = _tableName ? _tableName : 'documents' - const embeddings = nodeData.inputs?.embeddings as Embeddings - const additionalConfig = nodeData.inputs?.additionalConfig as string + const vectorStoreDriver: VectorStoreDriver = Postgres_VectorStores.getDriverFromConfig(nodeData, options) const output = nodeData.outputs?.output as string const topK = nodeData.inputs?.topK as string const k = topK ? parseFloat(topK) : 4 @@ -304,50 +278,13 @@ class Postgres_VectorStores implements INode { pgMetadataFilter = typeof _pgMetadataFilter === 'object' ? _pgMetadataFilter : JSON.parse(_pgMetadataFilter) } if (isFileUploadEnabled && options.chatId) { - pgMetadataFilter = pgMetadataFilter || {} pgMetadataFilter = { - ...pgMetadataFilter, - [FLOWISE_CHATID]: options.chatId, - $notexists: FLOWISE_CHATID // special filter to check if the field does not exist + ...(pgMetadataFilter || {}), + [FLOWISE_CHATID]: options.chatId } } - let additionalConfiguration = {} - if (additionalConfig) { - try { - additionalConfiguration = typeof additionalConfig === 'object' ? additionalConfig : JSON.parse(additionalConfig) - } catch (exception) { - throw new Error('Invalid JSON in the Additional Configuration: ' + exception) - } - } - - const postgresConnectionOptions = { - ...additionalConfiguration, - type: 'postgres', - host: nodeData.inputs?.host as string, - port: nodeData.inputs?.port as number, - username: user, // Required by TypeORMVectorStore - user: user, // Required by Pool in similaritySearchVectorWithScore - password: password, - database: nodeData.inputs?.database as string - } - - const args = { - postgresConnectionOptions: postgresConnectionOptions as DataSourceOptions, - tableName: tableName - } - - const vectorStore = await TypeORMVectorStore.fromDataSource(embeddings, args) - - // Rewrite the method to use pg pool connection instead of the default connection - /* Otherwise a connection error is displayed when the chain tries to execute the function - [chain/start] [1:chain:ConversationalRetrievalQAChain] Entering Chain run with input: { "question": "what the document is about", "chat_history": [] } - [retriever/start] [1:chain:ConversationalRetrievalQAChain > 2:retriever:VectorStoreRetriever] Entering Retriever run with input: { "query": "what the document is about" } - [ERROR]: uncaughtException: Illegal invocation TypeError: Illegal invocation at Socket.ref (node:net:1524:18) at Connection.ref (.../node_modules/pg/lib/connection.js:183:17) at Client.ref (.../node_modules/pg/lib/client.js:591:21) at BoundPool._pulseQueue (/node_modules/pg-pool/index.js:148:28) at .../node_modules/pg-pool/index.js:184:37 at process.processTicksAndRejections (node:internal/process/task_queues:77:11) - */ - vectorStore.similaritySearchVectorWithScore = async (query: number[], k: number, filter?: any) => { - return await similaritySearchVectorWithScore(query, k, tableName, postgresConnectionOptions, filter ?? pgMetadataFilter) - } + const vectorStore = await vectorStoreDriver.instanciate(pgMetadataFilter) if (output === 'retriever') { const retriever = vectorStore.asRetriever(k) @@ -361,51 +298,17 @@ class Postgres_VectorStores implements INode { } return vectorStore } -} - -const similaritySearchVectorWithScore = async ( - query: number[], - k: number, - tableName: string, - postgresConnectionOptions: ICommonObject, - filter?: any -) => { - const embeddingString = `[${query.join(',')}]` - let _filter = '{}' - let notExists = '' - if (filter && typeof filter === 'object') { - if (filter.$notexists) { - notExists = `OR NOT (metadata ? '${filter.$notexists}')` - delete filter.$notexists - } - _filter = JSON.stringify(filter) - } - - const queryString = ` - SELECT *, embedding <=> $1 as "_distance" - FROM ${tableName} - WHERE metadata @> $2 - ${notExists} - ORDER BY "_distance" ASC - LIMIT $3;` - - const pool = new Pool(postgresConnectionOptions) - const conn = await pool.connect() - const documents = await conn.query(queryString, [embeddingString, _filter, k]) - - conn.release() - - const results = [] as [TypeORMVectorStoreDocument, number][] - for (const doc of documents.rows) { - if (doc._distance != null && doc.pageContent != null) { - const document = new Document(doc) as TypeORMVectorStoreDocument - document.id = doc.id - results.push([document, doc._distance]) + static getDriverFromConfig(nodeData: INodeData, options: ICommonObject): VectorStoreDriver { + switch (nodeData.inputs?.driver) { + case 'typeorm': + return new TypeORMDriver(nodeData, options) + case 'pgvector': + return new PGVectorDriver(nodeData, options) + default: + return new TypeORMDriver(nodeData, options) } } - - return results } module.exports = { nodeClass: Postgres_VectorStores } diff --git a/packages/components/nodes/vectorstores/Postgres/README.md b/packages/components/nodes/vectorstores/Postgres/README.md new file mode 100644 index 00000000000..84e31b0a7c0 --- /dev/null +++ b/packages/components/nodes/vectorstores/Postgres/README.md @@ -0,0 +1,19 @@ +# Postgres Vector Store + +Postgres Vector Store integration for Flowise + +## 🌱 Env Variables + +| Variable | Description | Type | Default | +| ---------------------------- | ----------------------------------------------------------------------------------------------- | ------------------------------------------------ | ----------------------------------- | +| POSTGRES_VECTORSTORE_HOST | Default `host` for Postgres Vector Store | String | | +| POSTGRES_VECTORSTORE_PORT | Default `port` for Postgres Vector Store | Number | 5432 | +| POSTGRES_VECTORSTORE_USER | Default `user` for Postgres Vector Store | String | | +| POSTGRES_VECTORSTORE_PASSWORD | Default `password` for Postgres Vector Store | String | | +| POSTGRES_VECTORSTORE_DATABASE | Default `database` for Postgres Vector Store | String | | +| POSTGRES_VECTORSTORE_TABLE_NAME | Default `tableName` for Postgres Vector Store | String | documents | +| POSTGRES_VECTORSTORE_CONTENT_COLUMN_NAME | Default `contentColumnName` for Postgres Vector Store | String | pageContent | + +## License + +Source code in this repository is made available under the [Apache License Version 2.0](https://github.com/FlowiseAI/Flowise/blob/master/LICENSE.md). \ No newline at end of file diff --git a/packages/components/nodes/vectorstores/Postgres/driver/Base.ts b/packages/components/nodes/vectorstores/Postgres/driver/Base.ts new file mode 100644 index 00000000000..1616c71f7d4 --- /dev/null +++ b/packages/components/nodes/vectorstores/Postgres/driver/Base.ts @@ -0,0 +1,48 @@ +import { VectorStore } from '@langchain/core/vectorstores' +import { getCredentialData, getCredentialParam, ICommonObject, INodeData } from '../../../../src' +import { Document } from '@langchain/core/documents' +import { Embeddings } from '@langchain/core/embeddings' +import { getDatabase, getHost, getPort, getTableName } from '../utils' + +export abstract class VectorStoreDriver { + constructor(protected nodeData: INodeData, protected options: ICommonObject) {} + + abstract instanciate(metaDataFilters?: any): Promise + + abstract fromDocuments(documents: Document[]): Promise + + protected async adaptInstance(instance: VectorStore, _metaDataFilters?: any): Promise { + return instance + } + + getHost() { + return getHost(this.nodeData) as string + } + + getPort() { + return getPort(this.nodeData) as number + } + + getDatabase() { + return getDatabase(this.nodeData) as string + } + + getTableName() { + return getTableName(this.nodeData) + } + + getEmbeddings() { + return this.nodeData.inputs?.embeddings as Embeddings + } + + async getCredentials() { + const credentialData = await getCredentialData(this.nodeData.credential ?? '', this.options) + const user = getCredentialParam('user', credentialData, this.nodeData, process.env.POSTGRES_VECTORSTORE_USER) + const password = getCredentialParam('password', credentialData, this.nodeData, process.env.POSTGRES_VECTORSTORE_PASSWORD) + + return { + user, + password + } + } +} diff --git a/packages/components/nodes/vectorstores/Postgres/driver/PGVector.ts b/packages/components/nodes/vectorstores/Postgres/driver/PGVector.ts new file mode 100644 index 00000000000..39ec62ad9e7 --- /dev/null +++ b/packages/components/nodes/vectorstores/Postgres/driver/PGVector.ts @@ -0,0 +1,117 @@ +import { VectorStoreDriver } from './Base' +import { FLOWISE_CHATID } from '../../../../src' +import { DistanceStrategy, PGVectorStore, PGVectorStoreArgs } from '@langchain/community/vectorstores/pgvector' +import { Document } from '@langchain/core/documents' +import { PoolConfig } from 'pg' +import { getContentColumnName } from '../utils' + +export class PGVectorDriver extends VectorStoreDriver { + static CONTENT_COLUMN_NAME_DEFAULT: string = 'pageContent' + + protected _postgresConnectionOptions: PoolConfig + + protected async getPostgresConnectionOptions() { + if (!this._postgresConnectionOptions) { + const { user, password } = await this.getCredentials() + const additionalConfig = this.nodeData.inputs?.additionalConfig as string + + let additionalConfiguration = {} + + if (additionalConfig) { + try { + additionalConfiguration = typeof additionalConfig === 'object' ? additionalConfig : JSON.parse(additionalConfig) + } catch (exception) { + throw new Error('Invalid JSON in the Additional Configuration: ' + exception) + } + } + + this._postgresConnectionOptions = { + ...additionalConfiguration, + host: this.getHost(), + port: this.getPort(), + user: user, + password: password, + database: this.getDatabase() + } + } + + return this._postgresConnectionOptions + } + + async getArgs(): Promise { + return { + postgresConnectionOptions: await this.getPostgresConnectionOptions(), + tableName: this.getTableName(), + columns: { + contentColumnName: getContentColumnName(this.nodeData) + }, + distanceStrategy: (this.nodeData.inputs?.distanceStrategy || 'cosine') as DistanceStrategy + } + } + + async instanciate(metadataFilters?: any) { + return this.adaptInstance(await PGVectorStore.initialize(this.getEmbeddings(), await this.getArgs()), metadataFilters) + } + + async fromDocuments(documents: Document[]) { + const instance = await this.instanciate() + + await instance.addDocuments(documents) + + return this.adaptInstance(instance) + } + + protected async adaptInstance(instance: PGVectorStore, metadataFilters?: any): Promise { + const { [FLOWISE_CHATID]: chatId, ...pgMetadataFilter } = metadataFilters || {} + + const baseSimilaritySearchVectorWithScoreFn = instance.similaritySearchVectorWithScore.bind(instance) + + instance.similaritySearchVectorWithScore = async (query, k, filter) => { + return await baseSimilaritySearchVectorWithScoreFn(query, k, filter ?? pgMetadataFilter) + } + + const basePoolQueryFn = instance.pool.query.bind(instance.pool) + + // @ts-ignore + instance.pool.query = async (queryString: string, parameters: any[]) => { + if (!instance.client) { + instance.client = await instance.pool.connect() + } + + const whereClauseRegex = /WHERE ([^\n]+)/ + let chatflowOr = '' + + // Match chatflow uploaded file and keep filtering on other files: + // https://github.com/FlowiseAI/Flowise/pull/3367#discussion_r1804229295 + if (chatId) { + parameters.push({ [FLOWISE_CHATID]: chatId }) + + chatflowOr = `OR metadata @> $${parameters.length}` + } + + if (queryString.match(whereClauseRegex)) { + queryString = queryString.replace(whereClauseRegex, `WHERE (($1) AND NOT (metadata ? '${FLOWISE_CHATID}')) ${chatflowOr}`) + } else { + const orderByClauseRegex = /ORDER BY (.*)/ + // Insert WHERE clause before ORDER BY + queryString = queryString.replace( + orderByClauseRegex, + `WHERE (metadata @> '{}' AND NOT (metadata ? '${FLOWISE_CHATID}')) ${chatflowOr} + ORDER BY $1 + ` + ) + } + + // Run base function + const queryResult = await basePoolQueryFn(queryString, parameters) + + // ensure connection is released + instance.client.release() + instance.client = undefined + + return queryResult + } + + return instance + } +} diff --git a/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts b/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts new file mode 100644 index 00000000000..0217713b13f --- /dev/null +++ b/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts @@ -0,0 +1,169 @@ +import { DataSourceOptions } from 'typeorm' +import { VectorStoreDriver } from './Base' +import { FLOWISE_CHATID, ICommonObject } from '../../../../src' +import { TypeORMVectorStore, TypeORMVectorStoreArgs, TypeORMVectorStoreDocument } from '@langchain/community/vectorstores/typeorm' +import { VectorStore } from '@langchain/core/vectorstores' +import { Document } from '@langchain/core/documents' +import { Pool } from 'pg' + +export class TypeORMDriver extends VectorStoreDriver { + protected _postgresConnectionOptions: DataSourceOptions + + protected async getPostgresConnectionOptions() { + if (!this._postgresConnectionOptions) { + const { user, password } = await this.getCredentials() + const additionalConfig = this.nodeData.inputs?.additionalConfig as string + + let additionalConfiguration = {} + + if (additionalConfig) { + try { + additionalConfiguration = typeof additionalConfig === 'object' ? additionalConfig : JSON.parse(additionalConfig) + } catch (exception) { + throw new Error('Invalid JSON in the Additional Configuration: ' + exception) + } + } + + this._postgresConnectionOptions = { + ...additionalConfiguration, + type: 'postgres', + host: this.getHost(), + port: this.getPort(), + username: user, // Required by TypeORMVectorStore + user: user, // Required by Pool in similaritySearchVectorWithScore + password: password, + database: this.getDatabase() + } as DataSourceOptions + } + return this._postgresConnectionOptions + } + + async getArgs(): Promise { + return { + postgresConnectionOptions: await this.getPostgresConnectionOptions(), + tableName: this.getTableName() + } + } + + async instanciate(metadataFilters?: any) { + return this.adaptInstance(await TypeORMVectorStore.fromDataSource(this.getEmbeddings(), await this.getArgs()), metadataFilters) + } + + async fromDocuments(documents: Document[]) { + return this.adaptInstance(await TypeORMVectorStore.fromDocuments(documents, this.getEmbeddings(), await this.getArgs())) + } + + sanitizeDocuments(documents: Document[]) { + // Remove NULL characters which triggers error on PG + for (var i in documents) { + documents[i].pageContent = documents[i].pageContent.replace(/\0/g, '') + } + + return documents + } + + protected async adaptInstance(instance: TypeORMVectorStore, metadataFilters?: any): Promise { + const tableName = this.getTableName() + + // Rewrite the method to use pg pool connection instead of the default connection + /* Otherwise a connection error is displayed when the chain tries to execute the function + [chain/start] [1:chain:ConversationalRetrievalQAChain] Entering Chain run with input: { "question": "what the document is about", "chat_history": [] } + [retriever/start] [1:chain:ConversationalRetrievalQAChain > 2:retriever:VectorStoreRetriever] Entering Retriever run with input: { "query": "what the document is about" } + [ERROR]: uncaughtException: Illegal invocation TypeError: Illegal invocation at Socket.ref (node:net:1524:18) at Connection.ref (.../node_modules/pg/lib/connection.js:183:17) at Client.ref (.../node_modules/pg/lib/client.js:591:21) at BoundPool._pulseQueue (/node_modules/pg-pool/index.js:148:28) at .../node_modules/pg-pool/index.js:184:37 at process.processTicksAndRejections (node:internal/process/task_queues:77:11) + */ + instance.similaritySearchVectorWithScore = async (query: number[], k: number, filter?: any) => { + return await TypeORMDriver.similaritySearchVectorWithScore( + query, + k, + tableName, + await this.getPostgresConnectionOptions(), + filter ?? metadataFilters, + this.computedOperatorString + ) + } + + instance.delete = async (params: { ids: string[] }): Promise => { + const { ids } = params + + if (ids?.length) { + try { + instance.appDataSource.getRepository(instance.documentEntity).delete(ids) + } catch (e) { + console.error('Failed to delete') + } + } + } + + const baseAddVectorsFn = instance.addVectors.bind(instance) + + instance.addVectors = async (vectors, documents) => { + return baseAddVectorsFn(vectors, this.sanitizeDocuments(documents)) + } + + return instance + } + + get computedOperatorString() { + const { distanceStrategy = 'cosine' } = this.nodeData.inputs || {} + + switch (distanceStrategy) { + case 'cosine': + return '<=>' + case 'innerProduct': + return '<#>' + case 'euclidean': + return '<->' + default: + throw new Error(`Unknown distance strategy: ${distanceStrategy}`) + } + } + + static similaritySearchVectorWithScore = async ( + query: number[], + k: number, + tableName: string, + postgresConnectionOptions: ICommonObject, + filter?: any, + distanceOperator: string = '<=>' + ) => { + const embeddingString = `[${query.join(',')}]` + let chatflowOr = '' + const { [FLOWISE_CHATID]: chatId, ...restFilters } = filter || {} + + const _filter = JSON.stringify(restFilters || {}) + const parameters: any[] = [embeddingString, _filter, k] + + // Match chatflow uploaded file and keep filtering on other files: + // https://github.com/FlowiseAI/Flowise/pull/3367#discussion_r1804229295 + if (chatId) { + parameters.push({ [FLOWISE_CHATID]: chatId }) + chatflowOr = `OR metadata @> $${parameters.length}` + } + + const queryString = ` + SELECT *, embedding ${distanceOperator} $1 as "_distance" + FROM ${tableName} + WHERE ((metadata @> $2) AND NOT (metadata ? '${FLOWISE_CHATID}')) ${chatflowOr} + ORDER BY "_distance" ASC + LIMIT $3;` + + const pool = new Pool(postgresConnectionOptions) + + const conn = await pool.connect() + + const documents = await conn.query(queryString, parameters) + + conn.release() + + const results = [] as [TypeORMVectorStoreDocument, number][] + for (const doc of documents.rows) { + if (doc._distance != null && doc.pageContent != null) { + const document = new Document(doc) as TypeORMVectorStoreDocument + document.id = doc.id + results.push([document, doc._distance]) + } + } + + return results + } +} diff --git a/packages/components/nodes/vectorstores/Postgres/utils.ts b/packages/components/nodes/vectorstores/Postgres/utils.ts new file mode 100644 index 00000000000..e2b18b57075 --- /dev/null +++ b/packages/components/nodes/vectorstores/Postgres/utils.ts @@ -0,0 +1,21 @@ +import { defaultChain, INodeData } from '../../../src' + +export function getHost(nodeData?: INodeData) { + return defaultChain(nodeData?.inputs?.host, process.env.POSTGRES_VECTORSTORE_HOST) +} + +export function getDatabase(nodeData?: INodeData) { + return defaultChain(nodeData?.inputs?.database, process.env.POSTGRES_VECTORSTORE_DATABASE) +} + +export function getPort(nodeData?: INodeData) { + return defaultChain(nodeData?.inputs?.port, process.env.POSTGRES_VECTORSTORE_PORT, '5432') +} + +export function getTableName(nodeData?: INodeData) { + return defaultChain(nodeData?.inputs?.tableName, process.env.POSTGRES_VECTORSTORE_TABLE_NAME, 'documents') +} + +export function getContentColumnName(nodeData?: INodeData) { + return defaultChain(nodeData?.inputs?.contentColumnName, process.env.POSTGRES_VECTORSTORE_CONTENT_COLUMN_NAME, 'pageContent') +} diff --git a/packages/components/src/utils.ts b/packages/components/src/utils.ts index b57191abbe2..64b20a47f33 100644 --- a/packages/components/src/utils.ts +++ b/packages/components/src/utils.ts @@ -542,8 +542,19 @@ export const getCredentialData = async (selectedCredentialId: string, options: I } } -export const getCredentialParam = (paramName: string, credentialData: ICommonObject, nodeData: INodeData): any => { - return (nodeData.inputs as ICommonObject)[paramName] ?? credentialData[paramName] ?? undefined +/** + * Get first non falsy value + * + * @param {...any} values + * + * @returns {any|undefined} + */ +export const defaultChain = (...values: any[]): any | undefined => { + return values.filter(Boolean)[0] +} + +export const getCredentialParam = (paramName: string, credentialData: ICommonObject, nodeData: INodeData, defaultValue?: any): any => { + return (nodeData.inputs as ICommonObject)[paramName] ?? credentialData[paramName] ?? defaultValue ?? undefined } // reference https://www.freeformatter.com/json-escape.html