Skip to content

Commit

Permalink
Merge pull request FlowiseAI#1108 from vinodkiran/FEATURE/redis-vecto…
Browse files Browse the repository at this point in the history
…rstore

Feature/RedisVectorStore
  • Loading branch information
HenryHengZJ authored Oct 25, 2023
2 parents ee7644d + 4988627 commit a5d9596
Show file tree
Hide file tree
Showing 8 changed files with 406 additions and 22 deletions.
25 changes: 25 additions & 0 deletions packages/components/credentials/RedisCacheUrlApi.credential.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { INodeParams, INodeCredential } from '../src/Interface'

class RedisCacheUrlApi implements INodeCredential {
label: string
name: string
version: number
description: string
inputs: INodeParams[]

constructor() {
this.label = 'Redis Cache URL'
this.name = 'redisCacheUrlApi'
this.version = 1.0
this.inputs = [
{
label: 'Redis URL',
name: 'redisUrl',
type: 'string',
default: '127.0.0.1'
}
]
}
}

module.exports = { credClass: RedisCacheUrlApi }
29 changes: 18 additions & 11 deletions packages/components/nodes/cache/RedisCache/RedisCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class RedisCache implements INode {
name: 'credential',
type: 'credential',
optional: true,
credentialNames: ['redisCacheApi']
credentialNames: ['redisCacheApi', 'redisCacheUrlApi']
}
this.inputs = [
{
Expand All @@ -48,17 +48,24 @@ class RedisCache implements INode {
const ttl = nodeData.inputs?.ttl as string

const credentialData = await getCredentialData(nodeData.credential ?? '', options)
const username = getCredentialParam('redisCacheUser', credentialData, nodeData)
const password = getCredentialParam('redisCachePwd', credentialData, nodeData)
const portStr = getCredentialParam('redisCachePort', credentialData, nodeData)
const host = getCredentialParam('redisCacheHost', credentialData, nodeData)
const redisUrl = getCredentialParam('redisUrl', credentialData, nodeData)

const client = new Redis({
port: portStr ? parseInt(portStr) : 6379,
host,
username,
password
})
let client: Redis
if (!redisUrl || redisUrl === '') {
const username = getCredentialParam('redisCacheUser', credentialData, nodeData)
const password = getCredentialParam('redisCachePwd', credentialData, nodeData)
const portStr = getCredentialParam('redisCachePort', credentialData, nodeData)
const host = getCredentialParam('redisCacheHost', credentialData, nodeData)

client = new Redis({
port: portStr ? parseInt(portStr) : 6379,
host,
username,
password
})
} else {
client = new Redis(redisUrl)
}

const redisClient = new LangchainRedisCache(client)

Expand Down
30 changes: 19 additions & 11 deletions packages/components/nodes/cache/RedisCache/RedisEmbeddingsCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class RedisEmbeddingsCache implements INode {
name: 'credential',
type: 'credential',
optional: true,
credentialNames: ['redisCacheApi']
credentialNames: ['redisCacheApi', 'redisCacheUrlApi']
}
this.inputs = [
{
Expand Down Expand Up @@ -63,17 +63,25 @@ class RedisEmbeddingsCache implements INode {
const underlyingEmbeddings = nodeData.inputs?.embeddings as Embeddings

const credentialData = await getCredentialData(nodeData.credential ?? '', options)
const username = getCredentialParam('redisCacheUser', credentialData, nodeData)
const password = getCredentialParam('redisCachePwd', credentialData, nodeData)
const portStr = getCredentialParam('redisCachePort', credentialData, nodeData)
const host = getCredentialParam('redisCacheHost', credentialData, nodeData)
const redisUrl = getCredentialParam('redisUrl', credentialData, nodeData)

let client: Redis
if (!redisUrl || redisUrl === '') {
const username = getCredentialParam('redisCacheUser', credentialData, nodeData)
const password = getCredentialParam('redisCachePwd', credentialData, nodeData)
const portStr = getCredentialParam('redisCachePort', credentialData, nodeData)
const host = getCredentialParam('redisCacheHost', credentialData, nodeData)

client = new Redis({
port: portStr ? parseInt(portStr) : 6379,
host,
username,
password
})
} else {
client = new Redis(redisUrl)
}

const client = new Redis({
port: portStr ? parseInt(portStr) : 6379,
host,
username,
password
})
ttl ??= '3600'
let ttlNumber = parseInt(ttl, 10)
const redisStore = new RedisByteStore({
Expand Down
215 changes: 215 additions & 0 deletions packages/components/nodes/vectorstores/Redis/RedisSearchBase.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import {
getBaseClasses,
getCredentialData,
getCredentialParam,
ICommonObject,
INodeData,
INodeOutputsValue,
INodeParams
} from '../../../src'

import { Embeddings } from 'langchain/embeddings/base'
import { VectorStore } from 'langchain/vectorstores/base'
import { Document } from 'langchain/document'
import { createClient, SearchOptions } from 'redis'
import { RedisVectorStore } from 'langchain/vectorstores/redis'
import { escapeSpecialChars, unEscapeSpecialChars } from './utils'

export abstract class RedisSearchBase {
label: string
name: string
version: number
description: string
type: string
icon: string
category: string
baseClasses: string[]
inputs: INodeParams[]
credential: INodeParams
outputs: INodeOutputsValue[]
redisClient: ReturnType<typeof createClient>

protected constructor() {
this.type = 'Redis'
this.icon = 'redis.svg'
this.category = 'Vector Stores'
this.baseClasses = [this.type, 'VectorStoreRetriever', 'BaseRetriever']
this.credential = {
label: 'Connect Credential',
name: 'credential',
type: 'credential',
credentialNames: ['redisCacheUrlApi', 'redisCacheApi']
}
this.inputs = [
{
label: 'Embeddings',
name: 'embeddings',
type: 'Embeddings'
},
{
label: 'Index Name',
name: 'indexName',
placeholder: '<VECTOR_INDEX_NAME>',
type: 'string'
},
{
label: 'Replace Index?',
name: 'replaceIndex',
description: 'Selecting this option will delete the existing index and recreate a new one',
default: false,
type: 'boolean'
},
{
label: 'Content Field',
name: 'contentKey',
description: 'Name of the field (column) that contains the actual content',
type: 'string',
default: 'content',
additionalParams: true,
optional: true
},
{
label: 'Metadata Field',
name: 'metadataKey',
description: 'Name of the field (column) that contains the metadata of the document',
type: 'string',
default: 'metadata',
additionalParams: true,
optional: true
},
{
label: 'Vector Field',
name: 'vectorKey',
description: 'Name of the field (column) that contains the vector',
type: 'string',
default: 'content_vector',
additionalParams: true,
optional: true
},
{
label: 'Top K',
name: 'topK',
description: 'Number of top results to fetch. Default to 4',
placeholder: '4',
type: 'number',
additionalParams: true,
optional: true
}
]
this.outputs = [
{
label: 'Redis Retriever',
name: 'retriever',
baseClasses: this.baseClasses
},
{
label: 'Redis Vector Store',
name: 'vectorStore',
baseClasses: [this.type, ...getBaseClasses(RedisVectorStore)]
}
]
}

abstract constructVectorStore(
embeddings: Embeddings,
indexName: string,
replaceIndex: boolean,
docs: Document<Record<string, any>>[] | undefined
): Promise<VectorStore>

async init(nodeData: INodeData, _: string, options: ICommonObject, docs: Document<Record<string, any>>[] | undefined): Promise<any> {
const credentialData = await getCredentialData(nodeData.credential ?? '', options)
const indexName = nodeData.inputs?.indexName as string
let contentKey = nodeData.inputs?.contentKey as string
let metadataKey = nodeData.inputs?.metadataKey as string
let vectorKey = nodeData.inputs?.vectorKey as string
const embeddings = nodeData.inputs?.embeddings as Embeddings
const topK = nodeData.inputs?.topK as string
const replaceIndex = nodeData.inputs?.replaceIndex as boolean
const k = topK ? parseFloat(topK) : 4
const output = nodeData.outputs?.output as string

let redisUrl = getCredentialParam('redisUrl', credentialData, nodeData)
if (!redisUrl || redisUrl === '') {
const username = getCredentialParam('redisCacheUser', credentialData, nodeData)
const password = getCredentialParam('redisCachePwd', credentialData, nodeData)
const portStr = getCredentialParam('redisCachePort', credentialData, nodeData)
const host = getCredentialParam('redisCacheHost', credentialData, nodeData)

redisUrl = 'redis://' + username + ':' + password + '@' + host + ':' + portStr
}

this.redisClient = createClient({ url: redisUrl })
await this.redisClient.connect()

const vectorStore = await this.constructVectorStore(embeddings, indexName, replaceIndex, docs)
if (!contentKey || contentKey === '') contentKey = 'content'
if (!metadataKey || metadataKey === '') metadataKey = 'metadata'
if (!vectorKey || vectorKey === '') vectorKey = 'content_vector'

const buildQuery = (query: number[], k: number, filter?: string[]): [string, SearchOptions] => {
const vectorScoreField = 'vector_score'

let hybridFields = '*'
// if a filter is set, modify the hybrid query
if (filter && filter.length) {
// `filter` is a list of strings, then it's applied using the OR operator in the metadata key
hybridFields = `@${metadataKey}:(${filter.map(escapeSpecialChars).join('|')})`
}

const baseQuery = `${hybridFields} => [KNN ${k} @${vectorKey} $vector AS ${vectorScoreField}]`
const returnFields = [metadataKey, contentKey, vectorScoreField]

const options: SearchOptions = {
PARAMS: {
vector: Buffer.from(new Float32Array(query).buffer)
},
RETURN: returnFields,
SORTBY: vectorScoreField,
DIALECT: 2,
LIMIT: {
from: 0,
size: k
}
}

return [baseQuery, options]
}

vectorStore.similaritySearchVectorWithScore = async (
query: number[],
k: number,
filter?: string[]
): Promise<[Document, number][]> => {
const results = await this.redisClient.ft.search(indexName, ...buildQuery(query, k, filter))
const result: [Document, number][] = []

if (results.total) {
for (const res of results.documents) {
if (res.value) {
const document = res.value
if (document.vector_score) {
const metadataString = unEscapeSpecialChars(document[metadataKey] as string)
result.push([
new Document({
pageContent: document[contentKey] as string,
metadata: JSON.parse(metadataString)
}),
Number(document.vector_score)
])
}
}
}
}
return result
}

if (output === 'retriever') {
return vectorStore.asRetriever(k)
} else if (output === 'vectorStore') {
;(vectorStore as any).k = k
return vectorStore
}
return vectorStore
}
}
42 changes: 42 additions & 0 deletions packages/components/nodes/vectorstores/Redis/Redis_Existing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { ICommonObject, INode, INodeData } from '../../../src/Interface'
import { Embeddings } from 'langchain/embeddings/base'
import { VectorStore } from 'langchain/vectorstores/base'
import { RedisVectorStore, RedisVectorStoreConfig } from 'langchain/vectorstores/redis'
import { Document } from 'langchain/document'

import { RedisSearchBase } from './RedisSearchBase'

class RedisExisting_VectorStores extends RedisSearchBase implements INode {
constructor() {
super()
this.label = 'Redis Load Existing Index'
this.name = 'RedisIndex'
this.version = 1.0
this.description = 'Load existing index from Redis (i.e: Document has been upserted)'

// Remove deleteIndex from inputs as it is not applicable while fetching data from Redis
let input = this.inputs.find((i) => i.name === 'deleteIndex')
if (input) this.inputs.splice(this.inputs.indexOf(input), 1)
}

async constructVectorStore(
embeddings: Embeddings,
indexName: string,
// eslint-disable-next-line unused-imports/no-unused-vars
replaceIndex: boolean,
_: Document<Record<string, any>>[]
): Promise<VectorStore> {
const storeConfig: RedisVectorStoreConfig = {
redisClient: this.redisClient,
indexName: indexName
}

return new RedisVectorStore(embeddings, storeConfig)
}

async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
return super.init(nodeData, _, options, undefined)
}
}

module.exports = { nodeClass: RedisExisting_VectorStores }
Loading

0 comments on commit a5d9596

Please sign in to comment.