diff --git a/docker/.env.example b/docker/.env.example index a1ac2682cc7..aedf4a8a267 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -78,6 +78,7 @@ BLOB_STORAGE_PATH=/root/.flowise/storage # QUEUE_NAME=flowise-queue # QUEUE_REDIS_EVENT_STREAM_MAX_LEN=100000 # WORKER_CONCURRENCY=300 +# REDIS_URL= # REDIS_HOST=localhost # REDIS_PORT=6379 # REDIS_USERNAME= diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 2712e9f1419..42b81bab29c 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -34,6 +34,19 @@ services: - GLOBAL_AGENT_HTTPS_PROXY=${GLOBAL_AGENT_HTTPS_PROXY} - GLOBAL_AGENT_NO_PROXY=${GLOBAL_AGENT_NO_PROXY} - DISABLED_NODES=${DISABLED_NODES} + - MODE=${MODE} + - WORKER_CONCURRENCY=${WORKER_CONCURRENCY} + - QUEUE_NAME=${QUEUE_NAME} + - QUEUE_REDIS_EVENT_STREAM_MAX_LEN=${QUEUE_REDIS_EVENT_STREAM_MAX_LEN} + - REDIS_URL=${REDIS_URL} + - REDIS_HOST=${REDIS_HOST} + - REDIS_PORT=${REDIS_PORT} + - REDIS_PASSWORD=${REDIS_PASSWORD} + - REDIS_USERNAME=${REDIS_USERNAME} + - REDIS_TLS=${REDIS_TLS} + - REDIS_CERT=${REDIS_CERT} + - REDIS_KEY=${REDIS_KEY} + - REDIS_CA=${REDIS_CA} ports: - '${PORT}:${PORT}' volumes: diff --git a/docker/worker/README.md b/docker/worker/README.md new file mode 100644 index 00000000000..82769c1e2a1 --- /dev/null +++ b/docker/worker/README.md @@ -0,0 +1,24 @@ +# Flowise Worker + +By utilizing worker instances when operating in queue mode, Flowise can be scaled horizontally by adding more workers to handle increased workloads or scaled down by removing workers when demand decreases. + +Here’s an overview of the process: + +1. The primary Flowise instance sends an execution ID to a message broker, Redis, which maintains a queue of pending executions, allowing the next available worker to process them. +2. A worker from the pool retrieves a message from Redis. + The worker starts execute the actual job. +3. Once the execution is completed, the worker alerts the main instance that the execution is finished. + +# How to use + +## Setting up Main Server: + +1. Follow [setup guide](https://github.com/FlowiseAI/Flowise/blob/main/docker/README.md) +2. In the `.env.example`, setup all the necessary env variables for `QUEUE CONFIGURATION` + +## Setting up Worker: + +1. Copy paste the same `.env` file used to setup main server. Change the `PORT` to other available port numbers. Ex: 5566 +2. `docker compose up -d` +3. Open [http://localhost:5566](http://localhost:5566) +4. You can bring the worker container down by `docker compose stop` diff --git a/docker/worker/docker-compose.yml b/docker/worker/docker-compose.yml new file mode 100644 index 00000000000..88a8631d0df --- /dev/null +++ b/docker/worker/docker-compose.yml @@ -0,0 +1,54 @@ +version: '3.1' + +services: + flowise: + image: flowiseai/flowise + restart: always + environment: + - PORT=${PORT} + - CORS_ORIGINS=${CORS_ORIGINS} + - IFRAME_ORIGINS=${IFRAME_ORIGINS} + - FLOWISE_USERNAME=${FLOWISE_USERNAME} + - FLOWISE_PASSWORD=${FLOWISE_PASSWORD} + - FLOWISE_FILE_SIZE_LIMIT=${FLOWISE_FILE_SIZE_LIMIT} + - DEBUG=${DEBUG} + - DATABASE_PATH=${DATABASE_PATH} + - DATABASE_TYPE=${DATABASE_TYPE} + - DATABASE_PORT=${DATABASE_PORT} + - DATABASE_HOST=${DATABASE_HOST} + - DATABASE_NAME=${DATABASE_NAME} + - DATABASE_USER=${DATABASE_USER} + - DATABASE_PASSWORD=${DATABASE_PASSWORD} + - DATABASE_SSL=${DATABASE_SSL} + - DATABASE_SSL_KEY_BASE64=${DATABASE_SSL_KEY_BASE64} + - APIKEY_STORAGE_TYPE=${APIKEY_STORAGE_TYPE} + - APIKEY_PATH=${APIKEY_PATH} + - SECRETKEY_PATH=${SECRETKEY_PATH} + - FLOWISE_SECRETKEY_OVERWRITE=${FLOWISE_SECRETKEY_OVERWRITE} + - LOG_LEVEL=${LOG_LEVEL} + - LOG_PATH=${LOG_PATH} + - BLOB_STORAGE_PATH=${BLOB_STORAGE_PATH} + - DISABLE_FLOWISE_TELEMETRY=${DISABLE_FLOWISE_TELEMETRY} + - MODEL_LIST_CONFIG_JSON=${MODEL_LIST_CONFIG_JSON} + - GLOBAL_AGENT_HTTP_PROXY=${GLOBAL_AGENT_HTTP_PROXY} + - GLOBAL_AGENT_HTTPS_PROXY=${GLOBAL_AGENT_HTTPS_PROXY} + - GLOBAL_AGENT_NO_PROXY=${GLOBAL_AGENT_NO_PROXY} + - DISABLED_NODES=${DISABLED_NODES} + - MODE=${MODE} + - WORKER_CONCURRENCY=${WORKER_CONCURRENCY} + - QUEUE_NAME=${QUEUE_NAME} + - QUEUE_REDIS_EVENT_STREAM_MAX_LEN=${QUEUE_REDIS_EVENT_STREAM_MAX_LEN} + - REDIS_URL=${REDIS_URL} + - REDIS_HOST=${REDIS_HOST} + - REDIS_PORT=${REDIS_PORT} + - REDIS_PASSWORD=${REDIS_PASSWORD} + - REDIS_USERNAME=${REDIS_USERNAME} + - REDIS_TLS=${REDIS_TLS} + - REDIS_CERT=${REDIS_CERT} + - REDIS_KEY=${REDIS_KEY} + - REDIS_CA=${REDIS_CA} + ports: + - '${PORT}:${PORT}' + volumes: + - ~/.flowise:/root/.flowise + entrypoint: /bin/sh -c "sleep 3; flowise worker" diff --git a/packages/server/.env.example b/packages/server/.env.example index e7e5c48c8b1..d657ba1a4d4 100644 --- a/packages/server/.env.example +++ b/packages/server/.env.example @@ -78,6 +78,7 @@ PORT=3000 # QUEUE_NAME=flowise-queue # QUEUE_REDIS_EVENT_STREAM_MAX_LEN=100000 # WORKER_CONCURRENCY=300 +# REDIS_URL= # REDIS_HOST=localhost # REDIS_PORT=6379 # REDIS_USERNAME= diff --git a/packages/server/src/CachePool.ts b/packages/server/src/CachePool.ts index 69dd7ffc4ac..b8662a8e9fa 100644 --- a/packages/server/src/CachePool.ts +++ b/packages/server/src/CachePool.ts @@ -11,20 +11,24 @@ export class CachePool { constructor() { if (process.env.MODE === MODE.QUEUE) { - this.redisClient = new Redis({ - host: process.env.REDIS_HOST || 'localhost', - port: parseInt(process.env.REDIS_PORT || '6379'), - username: process.env.REDIS_USERNAME || undefined, - password: process.env.REDIS_PASSWORD || undefined, - tls: - process.env.REDIS_TLS === 'true' - ? { - cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, - key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, - ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined - } - : undefined - }) + if (process.env.REDIS_URL) { + this.redisClient = new Redis(process.env.REDIS_URL) + } else { + this.redisClient = new Redis({ + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT || '6379'), + username: process.env.REDIS_USERNAME || undefined, + password: process.env.REDIS_PASSWORD || undefined, + tls: + process.env.REDIS_TLS === 'true' + ? { + cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, + key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + } + : undefined + }) + } } } diff --git a/packages/server/src/commands/base.ts b/packages/server/src/commands/base.ts index 412388160ed..09d4db55e4c 100644 --- a/packages/server/src/commands/base.ts +++ b/packages/server/src/commands/base.ts @@ -57,6 +57,7 @@ export abstract class BaseCommand extends Command { WORKER_CONCURRENCY: Flags.string(), QUEUE_NAME: Flags.string(), QUEUE_REDIS_EVENT_STREAM_MAX_LEN: Flags.string(), + REDIS_URL: Flags.string(), REDIS_HOST: Flags.string(), REDIS_PORT: Flags.string(), REDIS_USERNAME: Flags.string(), @@ -176,6 +177,7 @@ export abstract class BaseCommand extends Command { // Queue if (flags.MODE) process.env.MODE = flags.MODE + if (flags.REDIS_URL) process.env.REDIS_URL = flags.REDIS_URL if (flags.REDIS_HOST) process.env.REDIS_HOST = flags.REDIS_HOST if (flags.REDIS_PORT) process.env.REDIS_PORT = flags.REDIS_PORT if (flags.REDIS_USERNAME) process.env.REDIS_USERNAME = flags.REDIS_USERNAME diff --git a/packages/server/src/commands/worker.ts b/packages/server/src/commands/worker.ts index 583c253a691..89810b6d818 100644 --- a/packages/server/src/commands/worker.ts +++ b/packages/server/src/commands/worker.ts @@ -37,7 +37,7 @@ export default class Worker extends BaseCommand { logger.info(`Prediction Worker ${this.predictionWorkerId} created`) const predictionQueueName = predictionQueue.getQueueName() - const queueEvents = new QueueEvents(predictionQueueName) + const queueEvents = new QueueEvents(predictionQueueName, { connection: queueManager.getConnection() }) queueEvents.on('abort', async ({ id }: { id: string }) => { abortControllerPool.abort(id) diff --git a/packages/server/src/queue/PredictionQueue.ts b/packages/server/src/queue/PredictionQueue.ts index f7a7099a310..dea8bc42da7 100644 --- a/packages/server/src/queue/PredictionQueue.ts +++ b/packages/server/src/queue/PredictionQueue.ts @@ -1,4 +1,3 @@ -import dotenv from 'dotenv' import { DataSource } from 'typeorm' import { executeFlow } from '../utils/buildChatflow' import { IComponentNodes, IExecuteFlowParams } from '../Interface' @@ -9,8 +8,6 @@ import { AbortControllerPool } from '../AbortControllerPool' import { BaseQueue } from './BaseQueue' import { RedisOptions } from 'bullmq' -dotenv.config() - interface PredictionQueueOptions { appDataSource: DataSource telemetry: Telemetry diff --git a/packages/server/src/queue/QueueManager.ts b/packages/server/src/queue/QueueManager.ts index 9fdbd6765aa..33d40ca0152 100644 --- a/packages/server/src/queue/QueueManager.ts +++ b/packages/server/src/queue/QueueManager.ts @@ -1,4 +1,3 @@ -import dotenv from 'dotenv' import { BaseQueue } from './BaseQueue' import { PredictionQueue } from './PredictionQueue' import { UpsertQueue } from './UpsertQueue' @@ -12,8 +11,6 @@ import { createBullBoard } from 'bull-board' import { BullMQAdapter } from 'bull-board/bullMQAdapter' import { Express } from 'express' -dotenv.config() - const QUEUE_NAME = process.env.QUEUE_NAME || 'flowise-queue' type QUEUE_TYPE = 'prediction' | 'upsert' @@ -25,19 +22,25 @@ export class QueueManager { private bullBoardRouter?: Express private constructor() { + let tlsOpts = undefined + if (process.env.REDIS_URL && process.env.REDIS_URL.startsWith('rediss://')) { + tlsOpts = { + rejectUnauthorized: false + } + } else if (process.env.REDIS_TLS === 'true') { + tlsOpts = { + cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, + key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + } + } this.connection = { + url: process.env.REDIS_URL || undefined, host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), username: process.env.REDIS_USERNAME || undefined, password: process.env.REDIS_PASSWORD || undefined, - tls: - process.env.REDIS_TLS === 'true' - ? { - cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, - key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, - ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined - } - : undefined + tls: tlsOpts } } diff --git a/packages/server/src/queue/RedisEventPublisher.ts b/packages/server/src/queue/RedisEventPublisher.ts index 7c78383b572..946aae93a41 100644 --- a/packages/server/src/queue/RedisEventPublisher.ts +++ b/packages/server/src/queue/RedisEventPublisher.ts @@ -5,7 +5,24 @@ export class RedisEventPublisher implements IServerSideEventStreamer { private redisPublisher: ReturnType constructor() { - this.redisPublisher = createClient() + if (process.env.REDIS_URL) { + this.redisPublisher = createClient({ + url: process.env.REDIS_URL + }) + } else { + this.redisPublisher = createClient({ + username: process.env.REDIS_USERNAME || undefined, + password: process.env.REDIS_PASSWORD || undefined, + socket: { + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT || '6379'), + tls: process.env.REDIS_TLS === 'true', + cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, + key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + } + }) + } } async connect() { diff --git a/packages/server/src/queue/RedisEventSubscriber.ts b/packages/server/src/queue/RedisEventSubscriber.ts index 5e7d0adb843..81b18e3cc84 100644 --- a/packages/server/src/queue/RedisEventSubscriber.ts +++ b/packages/server/src/queue/RedisEventSubscriber.ts @@ -7,7 +7,24 @@ export class RedisEventSubscriber { private subscribedChannels: Set = new Set() constructor(sseStreamer: SSEStreamer) { - this.redisSubscriber = createClient() + if (process.env.REDIS_URL) { + this.redisSubscriber = createClient({ + url: process.env.REDIS_URL + }) + } else { + this.redisSubscriber = createClient({ + username: process.env.REDIS_USERNAME || undefined, + password: process.env.REDIS_PASSWORD || undefined, + socket: { + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT || '6379'), + tls: process.env.REDIS_TLS === 'true', + cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, + key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + } + }) + } this.sseStreamer = sseStreamer } diff --git a/packages/server/src/queue/UpsertQueue.ts b/packages/server/src/queue/UpsertQueue.ts index 4ef30fd070a..7d9e4b39d7f 100644 --- a/packages/server/src/queue/UpsertQueue.ts +++ b/packages/server/src/queue/UpsertQueue.ts @@ -1,4 +1,3 @@ -import dotenv from 'dotenv' import { DataSource } from 'typeorm' import { IComponentNodes, IExecuteDocStoreUpsert, IExecuteFlowParams, IExecuteProcessLoader, IExecuteVectorStoreInsert } from '../Interface' import { Telemetry } from '../utils/telemetry' @@ -9,8 +8,6 @@ import { executeDocStoreUpsert, insertIntoVectorStore, processLoader } from '../ import { RedisOptions } from 'bullmq' import logger from '../utils/logger' -dotenv.config() - interface UpsertQueueOptions { appDataSource: DataSource telemetry: Telemetry diff --git a/packages/server/src/utils/rateLimit.ts b/packages/server/src/utils/rateLimit.ts index b13deacf20b..a7364f6c14b 100644 --- a/packages/server/src/utils/rateLimit.ts +++ b/packages/server/src/utils/rateLimit.ts @@ -13,20 +13,24 @@ export class RateLimiterManager { constructor() { if (process.env.MODE === MODE.QUEUE) { - this.redisClient = new Redis({ - host: process.env.REDIS_HOST || 'localhost', - port: parseInt(process.env.REDIS_PORT || '6379'), - username: process.env.REDIS_USERNAME || undefined, - password: process.env.REDIS_PASSWORD || undefined, - tls: - process.env.REDIS_TLS === 'true' - ? { - cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, - key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, - ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined - } - : undefined - }) + if (process.env.REDIS_URL) { + this.redisClient = new Redis(process.env.REDIS_URL) + } else { + this.redisClient = new Redis({ + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT || '6379'), + username: process.env.REDIS_USERNAME || undefined, + password: process.env.REDIS_PASSWORD || undefined, + tls: + process.env.REDIS_TLS === 'true' + ? { + cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, + key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + } + : undefined + }) + } } }