Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Support Google Cloud Storage #4061

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 43 additions & 39 deletions CONTRIBUTING.md

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion docker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,18 @@ BLOB_STORAGE_PATH=/root/.flowise/storage
# see https://raw.githubusercontent.com/FlowiseAI/Flowise/main/packages/components/models.json for the format
# MODEL_LIST_CONFIG_JSON=/your_model_list_config_file_path

# STORAGE_TYPE=local (local | s3)
# STORAGE_TYPE=local (local | s3 | gcs)
# BLOB_STORAGE_PATH=/your_storage_path/.flowise/storage
# S3_STORAGE_BUCKET_NAME=flowise
# S3_STORAGE_ACCESS_KEY_ID=<your-access-key>
# S3_STORAGE_SECRET_ACCESS_KEY=<your-secret-key>
# S3_STORAGE_REGION=us-west-2
# S3_ENDPOINT_URL=<custom-s3-endpoint-url>
# S3_FORCE_PATH_STYLE=false
# GOOGLE_CLOUD_STORAGE_CREDENTIAL=/the/keyfilename/path
# GOOGLE_CLOUD_STORAGE_PROJ_ID=<your-gcp-project-id>
# GOOGLE_CLOUD_STORAGE_BUCKET_NAME=<the-bucket-name>
# GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS=true

# SHOW_COMMUNITY_NODES=true
# DISABLED_NODES=bufferMemory,chatOpenAI (comma separated list of node names to disable)
Expand Down
1 change: 1 addition & 0 deletions packages/components/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"@gomomento/sdk": "^1.51.1",
"@gomomento/sdk-core": "^1.51.1",
"@google-ai/generativelanguage": "^2.5.0",
"@google-cloud/storage": "^7.15.2",
"@google/generative-ai": "^0.15.0",
"@huggingface/inference": "^2.6.1",
"@langchain/anthropic": "0.3.14",
Expand Down
91 changes: 91 additions & 0 deletions packages/components/src/storageUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
S3Client,
S3ClientConfig
} from '@aws-sdk/client-s3'
import { Storage } from '@google-cloud/storage'
import { Readable } from 'node:stream'
import { getUserHome } from './utils'
import sanitize from 'sanitize-filename'
Expand All @@ -34,6 +35,22 @@ export const addBase64FilesToStorage = async (fileBase64: string, chatflowid: st
})
await s3Client.send(putObjCmd)

fileNames.push(sanitizedFilename)
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const splitDataURI = fileBase64.split(',')
const filename = splitDataURI.pop()?.split(':')[1] ?? ''
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
const mime = splitDataURI[0].split(':')[1].split(';')[0]
const sanitizedFilename = _sanitizeFilename(filename)
const file = bucket.file(path.join(chatflowid, sanitizedFilename))
await new Promise<void>((resolve, reject) => {
file.createWriteStream({ contentType: mime, metadata: { contentEncoding: 'base64' } })
.on('error', (err) => reject(err))
.on('finish', () => resolve())
.end(bf)
})
fileNames.push(sanitizedFilename)
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
} else {
Expand Down Expand Up @@ -76,6 +93,17 @@ export const addArrayFilesToStorage = async (mime: string, bf: Buffer, fileName:
await s3Client.send(putObjCmd)
fileNames.push(sanitizedFilename)
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const file = bucket.file(path.join(...paths, sanitizedFilename))
await new Promise<void>((resolve, reject) => {
file.createWriteStream()
.on('error', (err) => reject(err))
.on('finish', () => resolve())
.end(bf)
})
fileNames.push(sanitizedFilename)
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
} else {
const dir = path.join(getStoragePath(), ...paths)
if (!fs.existsSync(dir)) {
Expand Down Expand Up @@ -109,6 +137,16 @@ export const addSingleFileToStorage = async (mime: string, bf: Buffer, fileName:
})
await s3Client.send(putObjCmd)
return 'FILE-STORAGE::' + sanitizedFilename
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const file = bucket.file(path.join(...paths, sanitizedFilename))
await new Promise<void>((resolve, reject) => {
file.createWriteStream({ contentType: mime, metadata: { contentEncoding: 'base64' } })
.on('error', (err) => reject(err))
.on('finish', () => resolve())
.end(bf)
})
return 'FILE-STORAGE::' + sanitizedFilename
} else {
const dir = path.join(getStoragePath(), ...paths)
if (!fs.existsSync(dir)) {
Expand Down Expand Up @@ -146,6 +184,11 @@ export const getFileFromUpload = async (filePath: string): Promise<Buffer> => {
// @ts-ignore
const buffer = Buffer.concat(response.Body.toArray())
return buffer
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const file = bucket.file(filePath)
const [buffer] = await file.download()
return buffer
} else {
return fs.readFileSync(filePath)
}
Expand Down Expand Up @@ -179,6 +222,11 @@ export const getFileFromStorage = async (file: string, ...paths: string[]): Prom
// @ts-ignore
const buffer = Buffer.concat(response.Body.toArray())
return buffer
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const file = bucket.file(path.join(...paths, sanitizedFilename))
const [buffer] = await file.download()
return buffer
} else {
const fileInStorage = path.join(getStoragePath(), ...paths, sanitizedFilename)
return fs.readFileSync(fileInStorage)
Expand Down Expand Up @@ -208,6 +256,9 @@ export const removeFilesFromStorage = async (...paths: string[]) => {
Key = Key.substring(1)
}
await _deleteS3Folder(Key)
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
await bucket.deleteFiles({ prefix: path.join(...paths) + '/' })
} else {
const directory = path.join(getStoragePath(), ...paths)
_deleteLocalFolderRecursive(directory)
Expand All @@ -223,6 +274,9 @@ export const removeSpecificFileFromUpload = async (filePath: string) => {
Key = Key.substring(1)
}
await _deleteS3Folder(Key)
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
await bucket.file(filePath).delete()
} else {
fs.unlinkSync(filePath)
}
Expand All @@ -237,6 +291,14 @@ export const removeSpecificFileFromStorage = async (...paths: string[]) => {
Key = Key.substring(1)
}
await _deleteS3Folder(Key)
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const fileName = paths.pop()
if (fileName) {
const sanitizedFilename = _sanitizeFilename(fileName)
paths.push(sanitizedFilename)
}
await bucket.file(path.join(...paths)).delete()
} else {
const fileName = paths.pop()
if (fileName) {
Expand All @@ -257,6 +319,9 @@ export const removeFolderFromStorage = async (...paths: string[]) => {
Key = Key.substring(1)
}
await _deleteS3Folder(Key)
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
await bucket.deleteFiles({ prefix: path.join(...paths) + '/' })
} else {
const directory = path.join(getStoragePath(), ...paths)
_deleteLocalFolderRecursive(directory, true)
Expand Down Expand Up @@ -355,6 +420,10 @@ export const streamStorageFile = async (
const blob = await body.transformToByteArray()
return Buffer.from(blob)
}
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const [buffer] = await bucket.file(path.join(chatflowId, chatId, sanitizedFilename)).download()
return buffer
} else {
const filePath = path.join(getStoragePath(), chatflowId, chatId, sanitizedFilename)
//raise error if file path is not absolute
Expand All @@ -372,6 +441,28 @@ export const streamStorageFile = async (
}
}

export const getGcsClient = () => {
const pathToGcsCredential = process.env.GOOGLE_CLOUD_STORAGE_CREDENTIAL
const projectId = process.env.GOOGLE_CLOUD_STORAGE_PROJ_ID
const bucketName = process.env.GOOGLE_CLOUD_STORAGE_BUCKET_NAME

if (!pathToGcsCredential) {
throw new Error('GOOGLE_CLOUD_STORAGE_CREDENTIAL env variable is required')
}
if (!bucketName) {
throw new Error('GOOGLE_CLOUD_STORAGE_BUCKET_NAME env variable is required')
}

const storageConfig = {
keyFilename: pathToGcsCredential,
...(projectId ? { projectId } : {})
}

const storage = new Storage(storageConfig)
const bucket = storage.bucket(bucketName)
return { storage, bucket }
}

export const getS3Config = () => {
const accessKeyId = process.env.S3_STORAGE_ACCESS_KEY_ID
const secretAccessKey = process.env.S3_STORAGE_SECRET_ACCESS_KEY
Expand Down
4 changes: 4 additions & 0 deletions packages/server/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ PORT=3000
# S3_STORAGE_REGION=us-west-2
# S3_ENDPOINT_URL=<custom-s3-endpoint-url>
# S3_FORCE_PATH_STYLE=false
# GOOGLE_CLOUD_STORAGE_CREDENTIAL=/the/keyfilename/path
# GOOGLE_CLOUD_STORAGE_PROJ_ID=<your-gcp-project-id>
# GOOGLE_CLOUD_STORAGE_BUCKET_NAME=<the-bucket-name>
# GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS=true

# SHOW_COMMUNITY_NODES=true
# DISABLED_NODES=bufferMemory,chatOpenAI (comma separated list of node names to disable)
Expand Down
2 changes: 2 additions & 0 deletions packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"license": "SEE LICENSE IN LICENSE.md",
"dependencies": {
"@aws-sdk/client-secrets-manager": "^3.699.0",
"@google-cloud/logging-winston": "^6.0.0",
"@oclif/core": "4.0.7",
"@opentelemetry/api": "^1.3.0",
"@opentelemetry/auto-instrumentations-node": "^0.52.0",
Expand Down Expand Up @@ -95,6 +96,7 @@
"moment": "^2.29.3",
"moment-timezone": "^0.5.34",
"multer": "^1.4.5-lts.1",
"multer-cloud-storage": "^4.0.0",
"multer-s3": "^3.0.1",
"mysql2": "^3.11.3",
"nim-container-manager": "^1.0.4",
Expand Down
9 changes: 9 additions & 0 deletions packages/server/src/commands/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ export abstract class BaseCommand extends Command {
S3_STORAGE_REGION: Flags.string(),
S3_ENDPOINT_URL: Flags.string(),
S3_FORCE_PATH_STYLE: Flags.string(),
GOOGLE_CLOUD_STORAGE_CREDENTIAL: Flags.string(),
GOOGLE_CLOUD_STORAGE_PROJ_ID: Flags.string(),
GOOGLE_CLOUD_STORAGE_BUCKET_NAME: Flags.string(),
GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS: Flags.string(),
SHOW_COMMUNITY_NODES: Flags.string(),
SECRETKEY_STORAGE_TYPE: Flags.string(),
SECRETKEY_PATH: Flags.string(),
Expand Down Expand Up @@ -182,6 +186,11 @@ export abstract class BaseCommand extends Command {
if (flags.S3_STORAGE_REGION) process.env.S3_STORAGE_REGION = flags.S3_STORAGE_REGION
if (flags.S3_ENDPOINT_URL) process.env.S3_ENDPOINT_URL = flags.S3_ENDPOINT_URL
if (flags.S3_FORCE_PATH_STYLE) process.env.S3_FORCE_PATH_STYLE = flags.S3_FORCE_PATH_STYLE
if (flags.GOOGLE_CLOUD_STORAGE_CREDENTIAL) process.env.GOOGLE_CLOUD_STORAGE_CREDENTIAL = flags.GOOGLE_CLOUD_STORAGE_CREDENTIAL
if (flags.GOOGLE_CLOUD_STORAGE_PROJ_ID) process.env.GOOGLE_CLOUD_STORAGE_PROJ_ID = flags.GOOGLE_CLOUD_STORAGE_PROJ_ID
if (flags.GOOGLE_CLOUD_STORAGE_BUCKET_NAME) process.env.GOOGLE_CLOUD_STORAGE_BUCKET_NAME = flags.GOOGLE_CLOUD_STORAGE_BUCKET_NAME
if (flags.GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS)
process.env.GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS = flags.GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS

// Queue
if (flags.MODE) process.env.MODE = flags.MODE
Expand Down
11 changes: 11 additions & 0 deletions packages/server/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { randomBytes } from 'crypto'
import { AES, enc } from 'crypto-js'
import multer from 'multer'
import multerS3 from 'multer-s3'
import MulterGoogleCloudStorage from 'multer-cloud-storage'
import { ChatFlow } from '../database/entities/ChatFlow'
import { ChatMessage } from '../database/entities/ChatMessage'
import { Credential } from '../database/entities/Credential'
Expand Down Expand Up @@ -1800,6 +1801,16 @@ export const getMulterStorage = () => {
})
})
return upload
} else if (storageType === 'gcs') {
return multer({
storage: new MulterGoogleCloudStorage({
projectId: process.env.GOOGLE_CLOUD_STORAGE_PROJ_ID,
bucket: process.env.GOOGLE_CLOUD_STORAGE_BUCKET_NAME,
keyFilename: process.env.GOOGLE_CLOUD_STORAGE_CREDENTIAL,
uniformBucketLevelAccess: Boolean(process.env.GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS) ?? true,
destination: `uploads/${getOrgId()}`
})
})
} else {
return multer({ dest: getUploadPath() })
}
Expand Down
42 changes: 42 additions & 0 deletions packages/server/src/utils/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import config from './config' // should be replaced by node-config or similar
import { createLogger, transports, format } from 'winston'
import { NextFunction, Request, Response } from 'express'
import { S3ClientConfig } from '@aws-sdk/client-s3'
import { LoggingWinston } from '@google-cloud/logging-winston'


const { S3StreamLogger } = require('s3-streamlogger')

Expand All @@ -13,6 +15,11 @@ const { combine, timestamp, printf, errors } = format
let s3ServerStream: any
let s3ErrorStream: any
let s3ServerReqStream: any

let gcsServerStream: any
let gcsErrorStream: any
let gcsServerReqStream: any

if (process.env.STORAGE_TYPE === 's3') {
const accessKeyId = process.env.S3_STORAGE_ACCESS_KEY_ID
const secretAccessKey = process.env.S3_STORAGE_SECRET_ACCESS_KEY
Expand Down Expand Up @@ -53,6 +60,29 @@ if (process.env.STORAGE_TYPE === 's3') {
})
}

if (process.env.STORAGE_TYPE === 'gcs') {
const config = {
projectId: process.env.GOOGLE_CLOUD_STORAGE_PROJ_ID,
keyFilename: process.env.GOOGLE_CLOUD_STORAGE_CREDENTIAL,
defaultCallback: (err: any) => {
if (err) {
console.log('Error occured: ' + err);
}
}
}
gcsServerStream = new LoggingWinston({
...config,
logName: 'server'
})
gcsErrorStream = new LoggingWinston({
...config,
logName: 'error'
})
gcsServerReqStream = new LoggingWinston({
...config,
logName: 'requests'
})
}
// expect the log dir be relative to the projects root
const logDir = config.logging.dir

Expand Down Expand Up @@ -94,6 +124,9 @@ const logger = createLogger({
stream: s3ServerStream
})
]
: []),
...(process.env.STORAGE_TYPE === 'gcs'
? [gcsServerStream]
: [])
],
exceptionHandlers: [
Expand All @@ -110,6 +143,9 @@ const logger = createLogger({
stream: s3ErrorStream
})
]
: []),
...(process.env.STORAGE_TYPE === 'gcs'
? [gcsErrorStream]
: [])
],
rejectionHandlers: [
Expand All @@ -126,6 +162,9 @@ const logger = createLogger({
stream: s3ErrorStream
})
]
: []),
...(process.env.STORAGE_TYPE === 'gcs'
? [gcsErrorStream]
: [])
]
})
Expand Down Expand Up @@ -161,6 +200,9 @@ export function expressRequestLogger(req: Request, res: Response, next: NextFunc
stream: s3ServerReqStream
})
]
: []),
...(process.env.STORAGE_TYPE === 'gcs'
? [gcsServerReqStream]
: [])
]
})
Expand Down
Loading