From 5f6bf35f5fdcc3fa0e2501ed001097b389edd345 Mon Sep 17 00:00:00 2001 From: Eyalm321 <145741922+Eyalm321@users.noreply.github.com> Date: Fri, 23 Aug 2024 12:52:54 -0500 Subject: [PATCH 1/6] added tls support --- .../nodes/vectorstores/Milvus/Milvus.ts | 314 +++++++++++------- 1 file changed, 186 insertions(+), 128 deletions(-) diff --git a/packages/components/nodes/vectorstores/Milvus/Milvus.ts b/packages/components/nodes/vectorstores/Milvus/Milvus.ts index 74cf4717332..935d91e752f 100644 --- a/packages/components/nodes/vectorstores/Milvus/Milvus.ts +++ b/packages/components/nodes/vectorstores/Milvus/Milvus.ts @@ -1,45 +1,45 @@ -import { flatten } from 'lodash' -import { DataType, ErrorCode, MetricType, IndexType } from '@zilliz/milvus2-sdk-node' -import { Document } from '@langchain/core/documents' -import { MilvusLibArgs, Milvus } from '@langchain/community/vectorstores/milvus' -import { Embeddings } from '@langchain/core/embeddings' -import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams, IndexingResult } from '../../../src/Interface' -import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' +import { flatten } from 'lodash'; +import { DataType, ErrorCode, MetricType, IndexType } from '@zilliz/milvus2-sdk-node'; +import { Document } from '@langchain/core/documents'; +import { MilvusLibArgs, Milvus } from '@langchain/community/vectorstores/milvus'; +import { Embeddings } from '@langchain/core/embeddings'; +import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams, IndexingResult } from '../../../src/Interface'; +import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'; interface InsertRow { - [x: string]: string | number[] + [x: string]: string | number[]; } class Milvus_VectorStores implements INode { - label: string - name: string - version: number - description: string - type: string - icon: string - category: string - badge: string - baseClasses: string[] - inputs: INodeParams[] - credential: INodeParams - outputs: INodeOutputsValue[] + label: string; + name: string; + version: number; + description: string; + type: string; + icon: string; + category: string; + badge: string; + baseClasses: string[]; + inputs: INodeParams[]; + credential: INodeParams; + outputs: INodeOutputsValue[]; constructor() { - this.label = 'Milvus' - this.name = 'milvus' - this.version = 1.0 - this.type = 'Milvus' - this.icon = 'milvus.svg' - this.category = 'Vector Stores' - this.description = `Upsert embedded data and perform similarity search upon query using Milvus, world's most advanced open-source vector database` - this.baseClasses = [this.type, 'VectorStoreRetriever', 'BaseRetriever'] + this.label = 'Milvus'; + this.name = 'milvus'; + this.version = 1.0; + this.type = 'Milvus'; + this.icon = 'milvus.svg'; + this.category = 'Vector Stores'; + this.description = `Upsert embedded data and perform similarity search upon query using Milvus, world's most advanced open-source vector database`; + this.baseClasses = [this.type, 'VectorStoreRetriever', 'BaseRetriever']; this.credential = { label: 'Connect Credential', name: 'credential', type: 'credential', optional: true, credentialNames: ['milvusAuth'] - } + }; this.inputs = [ { label: 'Document', @@ -90,8 +90,49 @@ class Milvus_VectorStores implements INode { type: 'number', additionalParams: true, optional: true - } - ] + }, + { + label: 'Secure', + name: 'secure', + type: 'boolean', + optional: true, + description: 'Enable secure connection to Milvus server', + additionalParams: true + }, + { + label: 'Client PEM Path', + name: 'clientPemPath', + type: 'string', + optional: true, + description: 'Path to the client PEM file', + additionalParams: true + }, + { + label: 'Client Key Path', + name: 'clientKeyPath', + type: 'string', + optional: true, + description: 'Path to the client key file', + additionalParams: true + }, + { + label: 'CA PEM Path', + name: 'caPemPath', + type: 'string', + optional: true, + description: 'Path to the root PEM file', + additionalParams: true + }, + { + label: 'Server Name', + name: 'serverName', + type: 'string', + optional: true, + description: 'Server name for the secure connection', + additionalParams: true + }, + + ]; this.outputs = [ { label: 'Milvus Retriever', @@ -103,150 +144,167 @@ class Milvus_VectorStores implements INode { name: 'vectorStore', baseClasses: [this.type, ...getBaseClasses(Milvus)] } - ] + ]; } //@ts-ignore vectorStoreMethods = { async upsert(nodeData: INodeData, options: ICommonObject): Promise> { // server setup - const address = nodeData.inputs?.milvusServerUrl as string - const collectionName = nodeData.inputs?.milvusCollection as string + const address = nodeData.inputs?.milvusServerUrl as string; + const collectionName = nodeData.inputs?.milvusCollection as string; // embeddings - const docs = nodeData.inputs?.document as Document[] - const embeddings = nodeData.inputs?.embeddings as Embeddings + const docs = nodeData.inputs?.document as Document[]; + const embeddings = nodeData.inputs?.embeddings as Embeddings; // credential - const credentialData = await getCredentialData(nodeData.credential ?? '', options) - const milvusUser = getCredentialParam('milvusUser', credentialData, nodeData) - const milvusPassword = getCredentialParam('milvusPassword', credentialData, nodeData) + const credentialData = await getCredentialData(nodeData.credential ?? '', options); + const milvusUser = getCredentialParam('milvusUser', credentialData, nodeData); + const milvusPassword = getCredentialParam('milvusPassword', credentialData, nodeData); + + // tls + const secure = nodeData.inputs?.secure as boolean; + const clientPemPath = nodeData.inputs?.clientPemPath as string; + const clientKeyPath = nodeData.inputs?.clientKeyPath as string; + const caPemPath = nodeData.inputs?.caPemPath as string; + const serverName = nodeData.inputs?.serverName as string; // init MilvusLibArgs const milVusArgs: MilvusLibArgs = { url: address, - collectionName: collectionName - } + collectionName: collectionName, + clientConfig: { + address: address, + ssl: secure, + tls: { + rootCertPath: caPemPath, + privateKeyPath: clientKeyPath, + certChainPath: clientPemPath, + serverName: serverName + } + } + }; - if (milvusUser) milVusArgs.username = milvusUser - if (milvusPassword) milVusArgs.password = milvusPassword + if (milvusUser) milVusArgs.username = milvusUser; + if (milvusPassword) milVusArgs.password = milvusPassword; - const flattenDocs = docs && docs.length ? flatten(docs) : [] - const finalDocs = [] + const flattenDocs = docs && docs.length ? flatten(docs) : []; + const finalDocs = []; for (let i = 0; i < flattenDocs.length; i += 1) { if (flattenDocs[i] && flattenDocs[i].pageContent) { - finalDocs.push(new Document(flattenDocs[i])) + finalDocs.push(new Document(flattenDocs[i])); } } try { - const vectorStore = await MilvusUpsert.fromDocuments(finalDocs, embeddings, milVusArgs) + const vectorStore = await MilvusUpsert.fromDocuments(finalDocs, embeddings, milVusArgs); // Avoid Illegal Invocation vectorStore.similaritySearchVectorWithScore = async (query: number[], k: number, filter?: string) => { - return await similaritySearchVectorWithScore(query, k, vectorStore, undefined, filter) - } + return await similaritySearchVectorWithScore(query, k, vectorStore, undefined, filter); + }; - return { numAdded: finalDocs.length, addedDocs: finalDocs } + return { numAdded: finalDocs.length, addedDocs: finalDocs }; } catch (e) { - throw new Error(e) + throw new Error(e); } } - } + }; async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { // server setup - const address = nodeData.inputs?.milvusServerUrl as string - const collectionName = nodeData.inputs?.milvusCollection as string - const milvusFilter = nodeData.inputs?.milvusFilter as string - const textField = nodeData.inputs?.milvusTextField as string + const address = nodeData.inputs?.milvusServerUrl as string; + const collectionName = nodeData.inputs?.milvusCollection as string; + const milvusFilter = nodeData.inputs?.milvusFilter as string; + const textField = nodeData.inputs?.milvusTextField as string; // embeddings - const embeddings = nodeData.inputs?.embeddings as Embeddings - const topK = nodeData.inputs?.topK as string + const embeddings = nodeData.inputs?.embeddings as Embeddings; + const topK = nodeData.inputs?.topK as string; // output - const output = nodeData.outputs?.output as string + const output = nodeData.outputs?.output as string; // format data - const k = topK ? parseFloat(topK) : 4 + const k = topK ? parseFloat(topK) : 4; // credential - const credentialData = await getCredentialData(nodeData.credential ?? '', options) - const milvusUser = getCredentialParam('milvusUser', credentialData, nodeData) - const milvusPassword = getCredentialParam('milvusPassword', credentialData, nodeData) + const credentialData = await getCredentialData(nodeData.credential ?? '', options); + const milvusUser = getCredentialParam('milvusUser', credentialData, nodeData); + const milvusPassword = getCredentialParam('milvusPassword', credentialData, nodeData); // init MilvusLibArgs const milVusArgs: MilvusLibArgs = { url: address, collectionName: collectionName, textField: textField - } + }; - if (milvusUser) milVusArgs.username = milvusUser - if (milvusPassword) milVusArgs.password = milvusPassword + if (milvusUser) milVusArgs.username = milvusUser; + if (milvusPassword) milVusArgs.password = milvusPassword; - const vectorStore = await Milvus.fromExistingCollection(embeddings, milVusArgs) + const vectorStore = await Milvus.fromExistingCollection(embeddings, milVusArgs); // Avoid Illegal Invocation vectorStore.similaritySearchVectorWithScore = async (query: number[], k: number, filter?: string) => { - return await similaritySearchVectorWithScore(query, k, vectorStore, milvusFilter, filter) - } + return await similaritySearchVectorWithScore(query, k, vectorStore, milvusFilter, filter); + }; if (output === 'retriever') { - const retriever = vectorStore.asRetriever(k) - return retriever + const retriever = vectorStore.asRetriever(k); + return retriever; } else if (output === 'vectorStore') { - ;(vectorStore as any).k = k + ; (vectorStore as any).k = k; if (milvusFilter) { - ;(vectorStore as any).filter = milvusFilter + ; (vectorStore as any).filter = milvusFilter; } - return vectorStore + return vectorStore; } - return vectorStore + return vectorStore; } } -const checkJsonString = (value: string): { isJson: boolean; obj: any } => { +const checkJsonString = (value: string): { isJson: boolean; obj: any; } => { try { - const result = JSON.parse(value) - return { isJson: true, obj: result } + const result = JSON.parse(value); + return { isJson: true, obj: result }; } catch (e) { - return { isJson: false, obj: null } + return { isJson: false, obj: null }; } -} +}; const similaritySearchVectorWithScore = async (query: number[], k: number, vectorStore: Milvus, milvusFilter?: string, filter?: string) => { const hasColResp = await vectorStore.client.hasCollection({ collection_name: vectorStore.collectionName - }) + }); if (hasColResp.status.error_code !== ErrorCode.SUCCESS) { - throw new Error(`Error checking collection: ${hasColResp}`) + throw new Error(`Error checking collection: ${hasColResp}`); } if (hasColResp.value === false) { - throw new Error(`Collection not found: ${vectorStore.collectionName}, please create collection before search.`) + throw new Error(`Collection not found: ${vectorStore.collectionName}, please create collection before search.`); } - const filterStr = milvusFilter ?? filter ?? '' + const filterStr = milvusFilter ?? filter ?? ''; - await vectorStore.grabCollectionFields() + await vectorStore.grabCollectionFields(); const loadResp = await vectorStore.client.loadCollectionSync({ collection_name: vectorStore.collectionName - }) + }); if (loadResp.error_code !== ErrorCode.SUCCESS) { - throw new Error(`Error loading collection: ${loadResp}`) + throw new Error(`Error loading collection: ${loadResp}`); } - const outputFields = vectorStore.fields.filter((field) => field !== vectorStore.vectorField) + const outputFields = vectorStore.fields.filter((field) => field !== vectorStore.vectorField); const search_params: any = { anns_field: vectorStore.vectorField, topk: k.toString(), metric_type: vectorStore.indexCreateParams.metric_type, params: vectorStore.indexSearchParams - } + }; const searchResp = await vectorStore.client.search({ collection_name: vectorStore.collectionName, search_params, @@ -254,49 +312,49 @@ const similaritySearchVectorWithScore = async (query: number[], k: number, vecto vector_type: DataType.FloatVector, vectors: [query], filter: filterStr - }) + }); if (searchResp.status.error_code !== ErrorCode.SUCCESS) { - throw new Error(`Error searching data: ${JSON.stringify(searchResp)}`) + throw new Error(`Error searching data: ${JSON.stringify(searchResp)}`); } - const results: [Document, number][] = [] + const results: [Document, number][] = []; searchResp.results.forEach((result) => { const fields = { pageContent: '', metadata: {} as Record - } + }; Object.keys(result).forEach((key) => { if (key === vectorStore.textField) { - fields.pageContent = result[key] + fields.pageContent = result[key]; } else if (vectorStore.fields.includes(key) || key === vectorStore.primaryField) { if (typeof result[key] === 'string') { - const { isJson, obj } = checkJsonString(result[key]) - fields.metadata[key] = isJson ? obj : result[key] + const { isJson, obj } = checkJsonString(result[key]); + fields.metadata[key] = isJson ? obj : result[key]; } else { - fields.metadata[key] = result[key] + fields.metadata[key] = result[key]; } } - }) - results.push([new Document(fields), result.score]) - }) - return results -} + }); + results.push([new Document(fields), result.score]); + }); + return results; +}; class MilvusUpsert extends Milvus { async addVectors(vectors: number[][], documents: Document[]): Promise { if (vectors.length === 0) { - return + return; } - await this.ensureCollection(vectors, documents) + await this.ensureCollection(vectors, documents); - const insertDatas: InsertRow[] = [] + const insertDatas: InsertRow[] = []; for (let index = 0; index < vectors.length; index++) { - const vec = vectors[index] - const doc = documents[index] + const vec = vectors[index]; + const doc = documents[index]; const data: InsertRow = { [this.textField]: doc.pageContent, [this.vectorField]: vec - } + }; this.fields.forEach((field) => { switch (field) { case this.primaryField: @@ -304,35 +362,35 @@ class MilvusUpsert extends Milvus { if (doc.metadata[this.primaryField] === undefined) { throw new Error( `The Collection's primaryField is configured with autoId=false, thus its value must be provided through metadata.` - ) + ); } - data[field] = doc.metadata[this.primaryField] + data[field] = doc.metadata[this.primaryField]; } - break + break; case this.textField: - data[field] = doc.pageContent - break + data[field] = doc.pageContent; + break; case this.vectorField: - data[field] = vec - break + data[field] = vec; + break; default: // metadata fields if (doc.metadata[field] === undefined) { - throw new Error(`The field "${field}" is not provided in documents[${index}].metadata.`) + throw new Error(`The field "${field}" is not provided in documents[${index}].metadata.`); } else if (typeof doc.metadata[field] === 'object') { - data[field] = JSON.stringify(doc.metadata[field]) + data[field] = JSON.stringify(doc.metadata[field]); } else { - data[field] = doc.metadata[field] + data[field] = doc.metadata[field]; } - break + break; } - }) + }); - insertDatas.push(data) + insertDatas.push(data); } const descIndexResp = await this.client.describeIndex({ collection_name: this.collectionName - }) + }); if (descIndexResp.status.error_code === ErrorCode.IndexNotExist) { const resp = await this.client.createIndex({ @@ -341,23 +399,23 @@ class MilvusUpsert extends Milvus { index_name: `myindex_${Date.now().toString()}`, index_type: IndexType.AUTOINDEX, metric_type: MetricType.L2 - }) + }); if (resp.error_code !== ErrorCode.SUCCESS) { - throw new Error(`Error creating index`) + throw new Error(`Error creating index`); } } const insertResp = await this.client.insert({ collection_name: this.collectionName, fields_data: insertDatas - }) + }); if (insertResp.status.error_code !== ErrorCode.SUCCESS) { - throw new Error(`Error inserting data: ${JSON.stringify(insertResp)}`) + throw new Error(`Error inserting data: ${JSON.stringify(insertResp)}`); } - await this.client.flushSync({ collection_names: [this.collectionName] }) + await this.client.flushSync({ collection_names: [this.collectionName] }); } } -module.exports = { nodeClass: Milvus_VectorStores } +module.exports = { nodeClass: Milvus_VectorStores }; From 3155804c57faf7520d795985ca834c5575ecaf0f Mon Sep 17 00:00:00 2001 From: Eyalm321 <145741922+Eyalm321@users.noreply.github.com> Date: Fri, 23 Aug 2024 14:54:14 -0500 Subject: [PATCH 2/6] forgot to add tls to init function as well, it works now. --- .../nodes/vectorstores/Milvus/Milvus.ts | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/packages/components/nodes/vectorstores/Milvus/Milvus.ts b/packages/components/nodes/vectorstores/Milvus/Milvus.ts index 935d91e752f..6c080ff6f2f 100644 --- a/packages/components/nodes/vectorstores/Milvus/Milvus.ts +++ b/packages/components/nodes/vectorstores/Milvus/Milvus.ts @@ -10,7 +10,7 @@ interface InsertRow { [x: string]: string | number[]; } -class Milvus_VectorStores implements INode { +export class Milvus_VectorStores implements INode { label: string; name: string; version: number; @@ -234,11 +234,29 @@ class Milvus_VectorStores implements INode { const milvusUser = getCredentialParam('milvusUser', credentialData, nodeData); const milvusPassword = getCredentialParam('milvusPassword', credentialData, nodeData); + // tls + const secure = nodeData.inputs?.secure as boolean; + const clientPemPath = nodeData.inputs?.clientPemPath as string; + const clientKeyPath = nodeData.inputs?.clientKeyPath as string; + const caPemPath = nodeData.inputs?.caPemPath as string; + const serverName = nodeData.inputs?.serverName as string; + + // init MilvusLibArgs const milVusArgs: MilvusLibArgs = { url: address, collectionName: collectionName, - textField: textField + textField: textField, + clientConfig: { + address: address, + ssl: secure, + tls: { + rootCertPath: caPemPath, + privateKeyPath: clientKeyPath, + certChainPath: clientPemPath, + serverName: serverName + } + } }; if (milvusUser) milVusArgs.username = milvusUser; From 7b6e1aaca6f5a376863eed3a646b338b31972e2f Mon Sep 17 00:00:00 2001 From: Eyalm321 <145741922+Eyalm321@users.noreply.github.com> Date: Sun, 25 Aug 2024 15:27:52 -0500 Subject: [PATCH 3/6] added partition support for Milvus --- .../nodes/vectorstores/Milvus/Milvus.ts | 47 +++++++++++++++++-- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/packages/components/nodes/vectorstores/Milvus/Milvus.ts b/packages/components/nodes/vectorstores/Milvus/Milvus.ts index 6c080ff6f2f..99658376434 100644 --- a/packages/components/nodes/vectorstores/Milvus/Milvus.ts +++ b/packages/components/nodes/vectorstores/Milvus/Milvus.ts @@ -64,6 +64,12 @@ export class Milvus_VectorStores implements INode { name: 'milvusCollection', type: 'string' }, + { + label: 'Milvus Partition Name', + name: 'milvusPartition', + type: 'string', + optional: true + }, { label: 'Milvus Text Field', name: 'milvusTextField', @@ -130,8 +136,7 @@ export class Milvus_VectorStores implements INode { optional: true, description: 'Server name for the secure connection', additionalParams: true - }, - + } ]; this.outputs = [ { @@ -170,10 +175,14 @@ export class Milvus_VectorStores implements INode { const caPemPath = nodeData.inputs?.caPemPath as string; const serverName = nodeData.inputs?.serverName as string; + // partition + const partitionName = nodeData.inputs?.milvusPartition as string; + // init MilvusLibArgs const milVusArgs: MilvusLibArgs = { url: address, collectionName: collectionName, + partitionName: partitionName, clientConfig: { address: address, ssl: secure, @@ -241,7 +250,6 @@ export class Milvus_VectorStores implements INode { const caPemPath = nodeData.inputs?.caPemPath as string; const serverName = nodeData.inputs?.serverName as string; - // init MilvusLibArgs const milVusArgs: MilvusLibArgs = { url: address, @@ -356,12 +364,12 @@ const similaritySearchVectorWithScore = async (query: number[], k: number, vecto }); return results; }; - class MilvusUpsert extends Milvus { async addVectors(vectors: number[][], documents: Document[]): Promise { if (vectors.length === 0) { return; } + await this.ensureCollection(vectors, documents); const insertDatas: InsertRow[] = []; @@ -369,10 +377,17 @@ class MilvusUpsert extends Milvus { for (let index = 0; index < vectors.length; index++) { const vec = vectors[index]; const doc = documents[index]; + + console.log(`Vector Dimension: ${vec.length}`); + console.log(`Document ${index} structure:`, JSON.stringify(doc, null, 2)); + const data: InsertRow = { [this.textField]: doc.pageContent, [this.vectorField]: vec }; + + console.log(`Keyof: textField: ${this.textField}, vectorField: ${this.vectorField}`); + this.fields.forEach((field) => { switch (field) { case this.primaryField: @@ -405,6 +420,28 @@ class MilvusUpsert extends Milvus { insertDatas.push(data); } + const partitionName = this.partitionName ?? '_default'; + // Ensure the partition exists before inserting data + const partitionResp = await this.client.hasPartition({ + collection_name: this.collectionName, + partition_name: partitionName + }); + + if (partitionResp.status.error_code !== ErrorCode.SUCCESS) { + throw new Error(`Error checking partition: ${JSON.stringify(partitionResp)}`); + } + + if (!partitionResp.value) { + // Create the partition if it doesn't exist + const createPartitionResp = await this.client.createPartition({ + collection_name: this.collectionName, + partition_name: partitionName + }); + + if (createPartitionResp.error_code !== ErrorCode.SUCCESS) { + throw new Error(`Error creating partition: ${createPartitionResp}`); + } + } const descIndexResp = await this.client.describeIndex({ collection_name: this.collectionName @@ -425,6 +462,7 @@ class MilvusUpsert extends Milvus { const insertResp = await this.client.insert({ collection_name: this.collectionName, + partition_name: partitionName, // Use the partition name here fields_data: insertDatas }); @@ -436,4 +474,5 @@ class MilvusUpsert extends Milvus { } } + module.exports = { nodeClass: Milvus_VectorStores }; From 8b1f0532476e8be4fbc9200900d868d1ed44e216 Mon Sep 17 00:00:00 2001 From: Eyalm321 <145741922+Eyalm321@users.noreply.github.com> Date: Tue, 27 Aug 2024 23:47:39 -0500 Subject: [PATCH 4/6] updated to 2.0.6 , pnpm lint fix aswell --- .../nodes/vectorstores/Milvus/Milvus.ts | 331 +++++++++--------- 1 file changed, 165 insertions(+), 166 deletions(-) diff --git a/packages/components/nodes/vectorstores/Milvus/Milvus.ts b/packages/components/nodes/vectorstores/Milvus/Milvus.ts index 59d08d95957..30b2b531842 100644 --- a/packages/components/nodes/vectorstores/Milvus/Milvus.ts +++ b/packages/components/nodes/vectorstores/Milvus/Milvus.ts @@ -1,46 +1,46 @@ -import { flatten } from 'lodash'; -import { DataType, ErrorCode, MetricType, IndexType } from '@zilliz/milvus2-sdk-node'; -import { Document } from '@langchain/core/documents'; -import { MilvusLibArgs, Milvus } from '@langchain/community/vectorstores/milvus'; -import { Embeddings } from '@langchain/core/embeddings'; -import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams, IndexingResult } from '../../../src/Interface'; -import { FLOWISE_CHATID, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'; -import { howToUseFileUpload } from '../VectorStoreUtils'; +import { flatten } from 'lodash' +import { DataType, ErrorCode, MetricType, IndexType } from '@zilliz/milvus2-sdk-node' +import { Document } from '@langchain/core/documents' +import { MilvusLibArgs, Milvus } from '@langchain/community/vectorstores/milvus' +import { Embeddings } from '@langchain/core/embeddings' +import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams, IndexingResult } from '../../../src/Interface' +import { FLOWISE_CHATID, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' +import { howToUseFileUpload } from '../VectorStoreUtils' interface InsertRow { - [x: string]: string | number[]; + [x: string]: string | number[] } class Milvus_VectorStores implements INode { - label: string; - name: string; - version: number; - description: string; - type: string; - icon: string; - category: string; - badge: string; - baseClasses: string[]; - inputs: INodeParams[]; - credential: INodeParams; - outputs: INodeOutputsValue[]; + label: string + name: string + version: number + description: string + type: string + icon: string + category: string + badge: string + baseClasses: string[] + inputs: INodeParams[] + credential: INodeParams + outputs: INodeOutputsValue[] constructor() { - this.label = 'Milvus'; - this.name = 'milvus'; - this.version = 2.0; - this.type = 'Milvus'; - this.icon = 'milvus.svg'; - this.category = 'Vector Stores'; - this.description = `Upsert embedded data and perform similarity search upon query using Milvus, world's most advanced open-source vector database`; - this.baseClasses = [this.type, 'VectorStoreRetriever', 'BaseRetriever']; + this.label = 'Milvus' + this.name = 'milvus' + this.version = 2.0 + this.type = 'Milvus' + this.icon = 'milvus.svg' + this.category = 'Vector Stores' + this.description = `Upsert embedded data and perform similarity search upon query using Milvus, world's most advanced open-source vector database` + this.baseClasses = [this.type, 'VectorStoreRetriever', 'BaseRetriever'] this.credential = { label: 'Connect Credential', name: 'credential', type: 'credential', optional: true, credentialNames: ['milvusAuth'] - }; + } this.inputs = [ { label: 'Document', @@ -151,7 +151,7 @@ class Milvus_VectorStores implements INode { description: 'Server name for the secure connection', additionalParams: true } - ]; + ] this.outputs = [ { label: 'Milvus Retriever', @@ -163,181 +163,203 @@ class Milvus_VectorStores implements INode { name: 'vectorStore', baseClasses: [this.type, ...getBaseClasses(Milvus)] } - ]; + ] } //@ts-ignore vectorStoreMethods = { async upsert(nodeData: INodeData, options: ICommonObject): Promise> { // server setup - const address = nodeData.inputs?.milvusServerUrl as string; - const collectionName = nodeData.inputs?.milvusCollection as string; + const address = nodeData.inputs?.milvusServerUrl as string + const collectionName = nodeData.inputs?.milvusCollection as string // embeddings - const docs = nodeData.inputs?.document as Document[]; - const embeddings = nodeData.inputs?.embeddings as Embeddings; - const isFileUploadEnabled = nodeData.inputs?.fileUpload as boolean; + const docs = nodeData.inputs?.document as Document[] + const embeddings = nodeData.inputs?.embeddings as Embeddings + const isFileUploadEnabled = nodeData.inputs?.fileUpload as boolean // credential - const credentialData = await getCredentialData(nodeData.credential ?? '', options); - const milvusUser = getCredentialParam('milvusUser', credentialData, nodeData); - const milvusPassword = getCredentialParam('milvusPassword', credentialData, nodeData); + const credentialData = await getCredentialData(nodeData.credential ?? '', options) + const milvusUser = getCredentialParam('milvusUser', credentialData, nodeData) + const milvusPassword = getCredentialParam('milvusPassword', credentialData, nodeData) + + // tls + const secure = nodeData.inputs?.secure as boolean + const clientPemPath = nodeData.inputs?.clientPemPath as string + const clientKeyPath = nodeData.inputs?.clientKeyPath as string + const caPemPath = nodeData.inputs?.caPemPath as string + const serverName = nodeData.inputs?.serverName as string + + // partition + const partition = nodeData.inputs?.milvusPartition as string // init MilvusLibArgs const milVusArgs: MilvusLibArgs = { url: address, - collectionName: collectionName - }; + collectionName: collectionName, + partitionName: partition, + clientConfig: { + address: address, + ssl: secure, + tls: { + rootCertPath: caPemPath, + certChainPath: clientPemPath, + privateKeyPath: clientKeyPath, + serverName: serverName + } + } + } - if (milvusUser) milVusArgs.username = milvusUser; - if (milvusPassword) milVusArgs.password = milvusPassword; + if (milvusUser) milVusArgs.username = milvusUser + if (milvusPassword) milVusArgs.password = milvusPassword - const flattenDocs = docs && docs.length ? flatten(docs) : []; - const finalDocs = []; + const flattenDocs = docs && docs.length ? flatten(docs) : [] + const finalDocs = [] for (let i = 0; i < flattenDocs.length; i += 1) { if (flattenDocs[i] && flattenDocs[i].pageContent) { if (isFileUploadEnabled && options.chatId) { - flattenDocs[i].metadata = { ...flattenDocs[i].metadata, [FLOWISE_CHATID]: options.chatId }; + flattenDocs[i].metadata = { ...flattenDocs[i].metadata, [FLOWISE_CHATID]: options.chatId } } - finalDocs.push(new Document(flattenDocs[i])); + finalDocs.push(new Document(flattenDocs[i])) } } try { - const vectorStore = await MilvusUpsert.fromDocuments(finalDocs, embeddings, milVusArgs); + const vectorStore = await MilvusUpsert.fromDocuments(finalDocs, embeddings, milVusArgs) // Avoid Illegal Invocation vectorStore.similaritySearchVectorWithScore = async (query: number[], k: number, filter?: string) => { - return await similaritySearchVectorWithScore(query, k, vectorStore, undefined, filter); - }; + return await similaritySearchVectorWithScore(query, k, vectorStore, undefined, filter) + } - return { numAdded: finalDocs.length, addedDocs: finalDocs }; + return { numAdded: finalDocs.length, addedDocs: finalDocs } } catch (e) { - throw new Error(e); + throw new Error(e) } } - }; + } async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { // server setup - const address = nodeData.inputs?.milvusServerUrl as string; - const collectionName = nodeData.inputs?.milvusCollection as string; - const _milvusFilter = nodeData.inputs?.milvusFilter as string; - const textField = nodeData.inputs?.milvusTextField as string; - const isFileUploadEnabled = nodeData.inputs?.fileUpload as boolean; + const address = nodeData.inputs?.milvusServerUrl as string + const collectionName = nodeData.inputs?.milvusCollection as string + const _milvusFilter = nodeData.inputs?.milvusFilter as string + const textField = nodeData.inputs?.milvusTextField as string + const isFileUploadEnabled = nodeData.inputs?.fileUpload as boolean // embeddings - const embeddings = nodeData.inputs?.embeddings as Embeddings; - const topK = nodeData.inputs?.topK as string; + const embeddings = nodeData.inputs?.embeddings as Embeddings + const topK = nodeData.inputs?.topK as string // output - const output = nodeData.outputs?.output as string; + const output = nodeData.outputs?.output as string // format data - const k = topK ? parseFloat(topK) : 4; + const k = topK ? parseFloat(topK) : 4 // credential - const credentialData = await getCredentialData(nodeData.credential ?? '', options); - const milvusUser = getCredentialParam('milvusUser', credentialData, nodeData); - const milvusPassword = getCredentialParam('milvusPassword', credentialData, nodeData); + const credentialData = await getCredentialData(nodeData.credential ?? '', options) + const milvusUser = getCredentialParam('milvusUser', credentialData, nodeData) + const milvusPassword = getCredentialParam('milvusPassword', credentialData, nodeData) // tls - const secure = nodeData.inputs?.secure as boolean; - const clientPemPath = nodeData.inputs?.clientPemPath as string; - const clientKeyPath = nodeData.inputs?.clientKeyPath as string; - const caPemPath = nodeData.inputs?.caPemPath as string; - const serverName = nodeData.inputs?.serverName as string; + const secure = nodeData.inputs?.secure as boolean + const clientPemPath = nodeData.inputs?.clientPemPath as string + const clientKeyPath = nodeData.inputs?.clientKeyPath as string + const caPemPath = nodeData.inputs?.caPemPath as string + const serverName = nodeData.inputs?.serverName as string // partition - const partitionName = nodeData.inputs?.milvusPartition ?? '_default'; + const partition = nodeData.inputs?.milvusPartition as string // init MilvusLibArgs const milVusArgs: MilvusLibArgs = { url: address, collectionName: collectionName, - partitionName: partitionName, - ssl: secure, + partitionName: partition, clientConfig: { address: address, + ssl: secure, tls: { + rootCertPath: caPemPath, certChainPath: clientPemPath, privateKeyPath: clientKeyPath, - rootCertPath: caPemPath, + serverName: serverName } }, textField: textField - }; + } - if (milvusUser) milVusArgs.username = milvusUser; - if (milvusPassword) milVusArgs.password = milvusPassword; + if (milvusUser) milVusArgs.username = milvusUser + if (milvusPassword) milVusArgs.password = milvusPassword - let milvusFilter = _milvusFilter; + let milvusFilter = _milvusFilter if (isFileUploadEnabled && options.chatId) { - if (milvusFilter) milvusFilter += ` OR ${FLOWISE_CHATID} == "${options.chatId}" OR NOT EXISTS(${FLOWISE_CHATID})`; - else milvusFilter = `${FLOWISE_CHATID} == "${options.chatId}" OR NOT EXISTS(${FLOWISE_CHATID})`; + if (milvusFilter) milvusFilter += ` OR ${FLOWISE_CHATID} == "${options.chatId}" OR NOT EXISTS(${FLOWISE_CHATID})` + else milvusFilter = `${FLOWISE_CHATID} == "${options.chatId}" OR NOT EXISTS(${FLOWISE_CHATID})` } - const vectorStore = await Milvus.fromExistingCollection(embeddings, milVusArgs); + const vectorStore = await Milvus.fromExistingCollection(embeddings, milVusArgs) // Avoid Illegal Invocation vectorStore.similaritySearchVectorWithScore = async (query: number[], k: number, filter?: string) => { - return await similaritySearchVectorWithScore(query, k, vectorStore, milvusFilter, filter); - }; + return await similaritySearchVectorWithScore(query, k, vectorStore, milvusFilter, filter) + } if (output === 'retriever') { - const retriever = vectorStore.asRetriever(k); - return retriever; + const retriever = vectorStore.asRetriever(k) + return retriever } else if (output === 'vectorStore') { - ; (vectorStore as any).k = k; + ;(vectorStore as any).k = k if (milvusFilter) { - ; (vectorStore as any).filter = milvusFilter; + ;(vectorStore as any).filter = milvusFilter } - return vectorStore; + return vectorStore } - return vectorStore; + return vectorStore } } -const checkJsonString = (value: string): { isJson: boolean; obj: any; } => { +const checkJsonString = (value: string): { isJson: boolean; obj: any } => { try { - const result = JSON.parse(value); - return { isJson: true, obj: result }; + const result = JSON.parse(value) + return { isJson: true, obj: result } } catch (e) { - return { isJson: false, obj: null }; + return { isJson: false, obj: null } } -}; +} const similaritySearchVectorWithScore = async (query: number[], k: number, vectorStore: Milvus, milvusFilter?: string, filter?: string) => { const hasColResp = await vectorStore.client.hasCollection({ collection_name: vectorStore.collectionName - }); + }) if (hasColResp.status.error_code !== ErrorCode.SUCCESS) { - throw new Error(`Error checking collection: ${hasColResp}`); + throw new Error(`Error checking collection: ${hasColResp}`) } if (hasColResp.value === false) { - throw new Error(`Collection not found: ${vectorStore.collectionName}, please create collection before search.`); + throw new Error(`Collection not found: ${vectorStore.collectionName}, please create collection before search.`) } - const filterStr = milvusFilter ?? filter ?? ''; + const filterStr = milvusFilter ?? filter ?? '' - await vectorStore.grabCollectionFields(); + await vectorStore.grabCollectionFields() const loadResp = await vectorStore.client.loadCollectionSync({ collection_name: vectorStore.collectionName - }); + }) if (loadResp.error_code !== ErrorCode.SUCCESS) { - throw new Error(`Error loading collection: ${loadResp}`); + throw new Error(`Error loading collection: ${loadResp}`) } - const outputFields = vectorStore.fields.filter((field) => field !== vectorStore.vectorField); + const outputFields = vectorStore.fields.filter((field) => field !== vectorStore.vectorField) const search_params: any = { anns_field: vectorStore.vectorField, topk: k.toString(), metric_type: vectorStore.indexCreateParams.metric_type, params: vectorStore.indexSearchParams - }; + } const searchResp = await vectorStore.client.search({ collection_name: vectorStore.collectionName, search_params, @@ -345,49 +367,49 @@ const similaritySearchVectorWithScore = async (query: number[], k: number, vecto vector_type: DataType.FloatVector, vectors: [query], filter: filterStr - }); + }) if (searchResp.status.error_code !== ErrorCode.SUCCESS) { - throw new Error(`Error searching data: ${JSON.stringify(searchResp)}`); + throw new Error(`Error searching data: ${JSON.stringify(searchResp)}`) } - const results: [Document, number][] = []; + const results: [Document, number][] = [] searchResp.results.forEach((result) => { const fields = { pageContent: '', metadata: {} as Record - }; + } Object.keys(result).forEach((key) => { if (key === vectorStore.textField) { - fields.pageContent = result[key]; + fields.pageContent = result[key] } else if (vectorStore.fields.includes(key) || key === vectorStore.primaryField) { if (typeof result[key] === 'string') { - const { isJson, obj } = checkJsonString(result[key]); - fields.metadata[key] = isJson ? obj : result[key]; + const { isJson, obj } = checkJsonString(result[key]) + fields.metadata[key] = isJson ? obj : result[key] } else { - fields.metadata[key] = result[key]; + fields.metadata[key] = result[key] } } - }); - results.push([new Document(fields), result.score]); - }); - return results; -}; + }) + results.push([new Document(fields), result.score]) + }) + return results +} class MilvusUpsert extends Milvus { async addVectors(vectors: number[][], documents: Document[]): Promise { if (vectors.length === 0) { - return; + return } - await this.ensureCollection(vectors, documents); + await this.ensureCollection(vectors, documents) - const insertDatas: InsertRow[] = []; + const insertDatas: InsertRow[] = [] for (let index = 0; index < vectors.length; index++) { - const vec = vectors[index]; - const doc = documents[index]; + const vec = vectors[index] + const doc = documents[index] const data: InsertRow = { [this.textField]: doc.pageContent, [this.vectorField]: vec - }; + } this.fields.forEach((field) => { switch (field) { case this.primaryField: @@ -395,58 +417,35 @@ class MilvusUpsert extends Milvus { if (doc.metadata[this.primaryField] === undefined) { throw new Error( `The Collection's primaryField is configured with autoId=false, thus its value must be provided through metadata.` - ); + ) } - data[field] = doc.metadata[this.primaryField]; + data[field] = doc.metadata[this.primaryField] } - break; + break case this.textField: - data[field] = doc.pageContent; - break; + data[field] = doc.pageContent + break case this.vectorField: - data[field] = vec; - break; + data[field] = vec + break default: // metadata fields if (doc.metadata[field] === undefined) { - throw new Error(`The field "${field}" is not provided in documents[${index}].metadata.`); + throw new Error(`The field "${field}" is not provided in documents[${index}].metadata.`) } else if (typeof doc.metadata[field] === 'object') { - data[field] = JSON.stringify(doc.metadata[field]); + data[field] = JSON.stringify(doc.metadata[field]) } else { - data[field] = doc.metadata[field]; + data[field] = doc.metadata[field] } - break; + break } - }); + }) - insertDatas.push(data); - } - - const partitionName = this.partitionName ?? '_default'; - - const partitionResp = await this.client.hasPartition({ - collection_name: this.collectionName, - partition_name: partitionName - }); - - if (partitionResp.status.error_code !== ErrorCode.SUCCESS) { - throw new Error(`Error checking partition: ${JSON.stringify(partitionResp)}`); - } - - if (!partitionResp.value) { - // Create the partition if it doesn't exist - const createPartitionResp = await this.client.createPartition({ - collection_name: this.collectionName, - partition_name: partitionName - }); - - if (createPartitionResp.error_code !== ErrorCode.SUCCESS) { - throw new Error(`Error creating partition: ${createPartitionResp}`); - } + insertDatas.push(data) } const descIndexResp = await this.client.describeIndex({ collection_name: this.collectionName - }); + }) if (descIndexResp.status.error_code === ErrorCode.IndexNotExist) { const resp = await this.client.createIndex({ @@ -455,23 +454,23 @@ class MilvusUpsert extends Milvus { index_name: `myindex_${Date.now().toString()}`, index_type: IndexType.AUTOINDEX, metric_type: MetricType.L2 - }); + }) if (resp.error_code !== ErrorCode.SUCCESS) { - throw new Error(`Error creating index`); + throw new Error(`Error creating index`) } } const insertResp = await this.client.insert({ collection_name: this.collectionName, fields_data: insertDatas - }); + }) if (insertResp.status.error_code !== ErrorCode.SUCCESS) { - throw new Error(`Error inserting data: ${JSON.stringify(insertResp)}`); + throw new Error(`Error inserting data: ${JSON.stringify(insertResp)}`) } - await this.client.flushSync({ collection_names: [this.collectionName] }); + await this.client.flushSync({ collection_names: [this.collectionName] }) } } -module.exports = { nodeClass: Milvus_VectorStores }; +module.exports = { nodeClass: Milvus_VectorStores } From e35c27ac451900d1ccd81af6303b3c37131ba591 Mon Sep 17 00:00:00 2001 From: Eyalm321 <145741922+Eyalm321@users.noreply.github.com> Date: Tue, 27 Aug 2024 23:53:41 -0500 Subject: [PATCH 5/6] ensure it points to _default partition if none provided --- packages/components/nodes/vectorstores/Milvus/Milvus.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/components/nodes/vectorstores/Milvus/Milvus.ts b/packages/components/nodes/vectorstores/Milvus/Milvus.ts index 30b2b531842..331770f7080 100644 --- a/packages/components/nodes/vectorstores/Milvus/Milvus.ts +++ b/packages/components/nodes/vectorstores/Milvus/Milvus.ts @@ -191,13 +191,13 @@ class Milvus_VectorStores implements INode { const serverName = nodeData.inputs?.serverName as string // partition - const partition = nodeData.inputs?.milvusPartition as string + const partitionName = nodeData.inputs?.milvusPartition ?? '_default' // init MilvusLibArgs const milVusArgs: MilvusLibArgs = { url: address, collectionName: collectionName, - partitionName: partition, + partitionName: partitionName, clientConfig: { address: address, ssl: secure, @@ -270,13 +270,13 @@ class Milvus_VectorStores implements INode { const serverName = nodeData.inputs?.serverName as string // partition - const partition = nodeData.inputs?.milvusPartition as string + const partitionName = nodeData.inputs?.milvusPartition ?? '_default' // init MilvusLibArgs const milVusArgs: MilvusLibArgs = { url: address, collectionName: collectionName, - partitionName: partition, + partitionName: partitionName, clientConfig: { address: address, ssl: secure, From 730340985c58b65a04ae7688a7962c54923b6614 Mon Sep 17 00:00:00 2001 From: Henry Heng Date: Fri, 30 Aug 2024 11:28:55 +0100 Subject: [PATCH 6/6] update milvus versioning --- .../nodes/vectorstores/Milvus/Milvus.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/packages/components/nodes/vectorstores/Milvus/Milvus.ts b/packages/components/nodes/vectorstores/Milvus/Milvus.ts index 331770f7080..8de7d478307 100644 --- a/packages/components/nodes/vectorstores/Milvus/Milvus.ts +++ b/packages/components/nodes/vectorstores/Milvus/Milvus.ts @@ -28,7 +28,7 @@ class Milvus_VectorStores implements INode { constructor() { this.label = 'Milvus' this.name = 'milvus' - this.version = 2.0 + this.version = 2.1 this.type = 'Milvus' this.icon = 'milvus.svg' this.category = 'Vector Stores' @@ -197,8 +197,11 @@ class Milvus_VectorStores implements INode { const milVusArgs: MilvusLibArgs = { url: address, collectionName: collectionName, - partitionName: partitionName, - clientConfig: { + partitionName: partitionName + } + + if (secure) { + milVusArgs.clientConfig = { address: address, ssl: secure, tls: { @@ -277,7 +280,11 @@ class Milvus_VectorStores implements INode { url: address, collectionName: collectionName, partitionName: partitionName, - clientConfig: { + textField: textField + } + + if (secure) { + milVusArgs.clientConfig = { address: address, ssl: secure, tls: { @@ -286,8 +293,7 @@ class Milvus_VectorStores implements INode { privateKeyPath: clientKeyPath, serverName: serverName } - }, - textField: textField + } } if (milvusUser) milVusArgs.username = milvusUser