Skip to content

Commit

Permalink
Improve CSV Loader
Browse files Browse the repository at this point in the history
  • Loading branch information
JJK801 committed Jan 8, 2025
1 parent c2c1ca9 commit 2d0bb5e
Show file tree
Hide file tree
Showing 5 changed files with 34,113 additions and 33,947 deletions.
125 changes: 42 additions & 83 deletions packages/components/nodes/documentloaders/Csv/Csv.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { omit } from 'lodash'
import { TextSplitter } from 'langchain/text_splitter'
import { CSVLoader } from '@langchain/community/document_loaders/fs/csv'
import { getFileFromStorage, handleEscapeCharacters } from '../../../src'
import { CSVLoader } from './CsvLoader'
import { getFileFromStorage, handleDocumentLoaderDocuments, handleDocumentLoaderMetadata, handleDocumentLoaderOutput } from '../../../src'
import { ICommonObject, IDocument, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface'

class Csv_DocumentLoaders implements INode {
Expand All @@ -19,7 +18,7 @@ class Csv_DocumentLoaders implements INode {
constructor() {
this.label = 'Csv File'
this.name = 'csvFile'
this.version = 2.0
this.version = 3.0
this.type = 'Document'
this.icon = 'csv.svg'
this.category = 'Document Loaders'
Expand Down Expand Up @@ -82,21 +81,11 @@ class Csv_DocumentLoaders implements INode {
]
}

async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
const textSplitter = nodeData.inputs?.textSplitter as TextSplitter
getFiles(nodeData: INodeData) {
const csvFileBase64 = nodeData.inputs?.csvFile as string
const columnName = nodeData.inputs?.columnName as string
const metadata = nodeData.inputs?.metadata
const output = nodeData.outputs?.output as string
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string

let omitMetadataKeys: string[] = []
if (_omitMetadataKeys) {
omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim())
}

let docs: IDocument[] = []
let files: string[] = []
let fromStorage: boolean = true

if (csvFileBase64.startsWith('FILE-STORAGE::')) {
const fileName = csvFileBase64.replace('FILE-STORAGE::', '')
Expand All @@ -105,86 +94,56 @@ class Csv_DocumentLoaders implements INode {
} else {
files = [fileName]
}
const chatflowid = options.chatflowid

for (const file of files) {
if (!file) continue
const fileData = await getFileFromStorage(file, chatflowid)
const blob = new Blob([fileData])
const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim())

if (textSplitter) {
docs = await loader.load()
docs = await textSplitter.splitDocuments(docs)
} else {
docs.push(...(await loader.load()))
}
}
} else {
if (csvFileBase64.startsWith('[') && csvFileBase64.endsWith(']')) {
files = JSON.parse(csvFileBase64)
} else {
files = [csvFileBase64]
}

for (const file of files) {
if (!file) continue
const splitDataURI = file.split(',')
splitDataURI.pop()
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
const blob = new Blob([bf])
const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim())

if (textSplitter) {
docs = await loader.load()
docs = await textSplitter.splitDocuments(docs)
} else {
docs.push(...(await loader.load()))
}
}
fromStorage = false
}

if (metadata) {
const parsedMetadata = typeof metadata === 'object' ? metadata : JSON.parse(metadata)
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {
...parsedMetadata
}
: omit(
{
...doc.metadata,
...parsedMetadata
},
omitMetadataKeys
)
}))
return { files, fromStorage }
}

async getFileData(file: string, { chatflowid }: { chatflowid: string }, fromStorage?: boolean) {
if (fromStorage) {
return getFileFromStorage(file, chatflowid)
} else {
docs = docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? {}
: omit(
{
...doc.metadata
},
omitMetadataKeys
)
}))
const splitDataURI = file.split(',')
splitDataURI.pop()
return Buffer.from(splitDataURI.pop() || '', 'base64')
}
}

if (output === 'document') {
return docs
} else {
let finaltext = ''
for (const doc of docs) {
finaltext += `${doc.pageContent}\n`
}
return handleEscapeCharacters(finaltext, false)
async init(nodeData: INodeData, _: string, options: ICommonObject): Promise<any> {
const textSplitter = nodeData.inputs?.textSplitter as TextSplitter
const columnName = nodeData.inputs?.columnName as string
const metadata = nodeData.inputs?.metadata
const output = nodeData.outputs?.output as string
const _omitMetadataKeys = nodeData.inputs?.omitMetadataKeys as string

let docs: IDocument[] = []

const chatflowid = options.chatflowid

const { files, fromStorage } = this.getFiles(nodeData)

for (const file of files) {
if (!file) continue

const fileData = await this.getFileData(file, { chatflowid }, fromStorage)
const blob = new Blob([fileData])
const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim())

// use spread instead of push, because it raises RangeError: Maximum call stack size exceeded when too many docs
docs = [...docs, ...(await handleDocumentLoaderDocuments(loader, textSplitter))]
}

docs = handleDocumentLoaderMetadata(docs, _omitMetadataKeys, metadata)

return handleDocumentLoaderOutput(docs, output)
}
}

Expand Down
74 changes: 74 additions & 0 deletions packages/components/nodes/documentloaders/Csv/CsvLoader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { TextLoader } from 'langchain/document_loaders/fs/text'
import Papa from 'papaparse'

type CSVLoaderOptions = {
// Return specifific column from key (string) or index (integer)
column?: string | number
// Force separator (default: auto detect)
separator?: string
}

/**
* A class that extends the TextLoader class. It represents a document
* loader that loads documents from a CSV file. It has a constructor that
* takes a `filePathOrBlob` parameter representing the path to the CSV
* file or a Blob object, and an optional `options` parameter of type
* `CSVLoaderOptions` or a string representing the column to use as the
* document's pageContent.
*/
export class CSVLoader extends TextLoader {
protected options: CSVLoaderOptions = {}

constructor(filePathOrBlob: ConstructorParameters<typeof TextLoader>[0], options?: CSVLoaderOptions | string) {
super(filePathOrBlob)

if (typeof options === 'string') {
this.options = { column: options }
} else {
this.options = options ?? this.options
}
}
/**
* A protected method that parses the raw CSV data and returns an array of
* strings representing the pageContent of each document. It uses the
* `papaparse` to parse the CSV data. If
* the `column` option is specified, it checks if the column exists in the
* CSV file and returns the values of that column as the pageContent. If
* the `column` option is not specified, it converts each row of the CSV
* data into key/value pairs and joins them with newline characters.
* @param raw The raw CSV data to be parsed.
* @returns An array of strings representing the pageContent of each document.
*/
async parse(raw: string): Promise<string[]> {
const { column, separator } = this.options

const {
data: parsed,
meta: { fields = [] }
} = Papa.parse<{ [K: string]: string }>(raw.trim(), {
delimiter: separator,
header: true
})

if (column !== undefined) {
if (!fields.length) {
throw new Error(`Unable to resolve fields from header.`)
}

let searchIdx = column

if (typeof column == 'number') {
searchIdx = fields[column]
}

if (!fields.includes(searchIdx as string)) {
throw new Error(`Column ${column} not found in CSV file.`)
}

// Note TextLoader will raise an exception if the value is null.
return parsed.map((row) => row[searchIdx])
}

return parsed.map((row) => fields.map((key) => `${key.trim() || '_0'}: ${row[key]?.trim()}`).join('\n'))
}
}
2 changes: 2 additions & 0 deletions packages/components/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
"object-hash": "^3.0.0",
"ollama": "^0.5.11",
"openai": "^4.57.3",
"papaparse": "^5.4.1",
"pdf-parse": "^1.1.1",
"pdfjs-dist": "^3.7.107",
"pg": "^8.11.2",
Expand All @@ -139,6 +140,7 @@
"@types/lodash": "^4.14.202",
"@types/node-fetch": "2.6.2",
"@types/object-hash": "^3.0.2",
"@types/papaparse": "^5.3.15",
"@types/pg": "^8.10.2",
"@types/ws": "^8.5.3",
"babel-register": "^6.26.0",
Expand Down
69 changes: 69 additions & 0 deletions packages/components/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ import { z } from 'zod'
import { DataSource } from 'typeorm'
import { ICommonObject, IDatabaseEntity, IDocument, IMessage, INodeData, IVariable, MessageContentImageUrl } from './Interface'
import { AES, enc } from 'crypto-js'
import { omit } from 'lodash'
import { AIMessage, HumanMessage, BaseMessage } from '@langchain/core/messages'
import { Document } from '@langchain/core/documents'
import { getFileFromStorage } from './storageUtils'
import { customGet } from '../nodes/sequentialagents/commonUtils'
import { TextSplitter } from 'langchain/text_splitter'
import { DocumentLoader } from 'langchain/document_loaders/base'

export const numberOrExpressionRegex = '^(\\d+\\.?\\d*|{{.*}})$' //return true if string consists only numbers OR expression {{}}
export const notEmptyRegex = '(.|\\s)*\\S(.|\\s)*' //return true if string is not empty or blank
Expand Down Expand Up @@ -1036,3 +1040,68 @@ export const resolveFlowObjValue = (obj: any, sourceObj: any): any => {
return obj
}
}

export const handleDocumentLoaderOutput = (docs: Document[], output: string) => {
if (output === 'document') {
return docs
} else {
let finaltext = ''
for (const doc of docs) {
finaltext += `${doc.pageContent}\n`
}
return handleEscapeCharacters(finaltext, false)
}
}

export const parseDocumentLoaderMetadata = (metadata: object | string): object => {
if (!metadata) return {}

if (typeof metadata !== 'object') {
return JSON.parse(metadata)
}

return metadata
}

export const handleDocumentLoaderMetadata = (
docs: Document[],
_omitMetadataKeys: string,
metadata: object | string = {},
sourceIdKey?: string
) => {
let omitMetadataKeys: string[] = []
if (_omitMetadataKeys) {
omitMetadataKeys = _omitMetadataKeys.split(',').map((key) => key.trim())
}

metadata = parseDocumentLoaderMetadata(metadata)

return docs.map((doc) => ({
...doc,
metadata:
_omitMetadataKeys === '*'
? metadata
: omit(
{
...metadata,
...doc.metadata,
...(sourceIdKey ? { [sourceIdKey]: doc.metadata[sourceIdKey] || sourceIdKey } : undefined)
},
omitMetadataKeys
)
}))
}

export const handleDocumentLoaderDocuments = async (loader: DocumentLoader, textSplitter?: TextSplitter) => {
let docs: Document[] = []

if (textSplitter) {
let splittedDocs = await loader.load()
splittedDocs = await textSplitter.splitDocuments(splittedDocs)
docs = splittedDocs
} else {
docs = await loader.load()
}

return docs
}
Loading

0 comments on commit 2d0bb5e

Please sign in to comment.