Skip to content

Commit

Permalink
update redis configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
HenryHengZJ committed Jan 6, 2025
1 parent 55cdf93 commit 13d69b7
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 48 deletions.
1 change: 1 addition & 0 deletions docker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ BLOB_STORAGE_PATH=/root/.flowise/storage
# QUEUE_NAME=flowise-queue
# QUEUE_REDIS_EVENT_STREAM_MAX_LEN=100000
# WORKER_CONCURRENCY=300
# REDIS_URL=
# REDIS_HOST=localhost
# REDIS_PORT=6379
# REDIS_USERNAME=
Expand Down
13 changes: 13 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ services:
- GLOBAL_AGENT_HTTPS_PROXY=${GLOBAL_AGENT_HTTPS_PROXY}
- GLOBAL_AGENT_NO_PROXY=${GLOBAL_AGENT_NO_PROXY}
- DISABLED_NODES=${DISABLED_NODES}
- MODE=${MODE}
- WORKER_CONCURRENCY=${WORKER_CONCURRENCY}
- QUEUE_NAME=${QUEUE_NAME}
- QUEUE_REDIS_EVENT_STREAM_MAX_LEN=${QUEUE_REDIS_EVENT_STREAM_MAX_LEN}
- REDIS_URL=${REDIS_URL}
- REDIS_HOST=${REDIS_HOST}
- REDIS_PORT=${REDIS_PORT}
- REDIS_PASSWORD=${REDIS_PASSWORD}
- REDIS_USERNAME=${REDIS_USERNAME}
- REDIS_TLS=${REDIS_TLS}
- REDIS_CERT=${REDIS_CERT}
- REDIS_KEY=${REDIS_KEY}
- REDIS_CA=${REDIS_CA}
ports:
- '${PORT}:${PORT}'
volumes:
Expand Down
24 changes: 24 additions & 0 deletions docker/worker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Flowise Worker

By utilizing worker instances when operating in queue mode, Flowise can be scaled horizontally by adding more workers to handle increased workloads or scaled down by removing workers when demand decreases.

Here’s an overview of the process:

1. The primary Flowise instance sends an execution ID to a message broker, Redis, which maintains a queue of pending executions, allowing the next available worker to process them.
2. A worker from the pool retrieves a message from Redis.
The worker starts execute the actual job.
3. Once the execution is completed, the worker alerts the main instance that the execution is finished.

# How to use

## Setting up Main Server:

1. Follow [setup guide](https://github.com/FlowiseAI/Flowise/blob/main/docker/README.md)
2. In the `.env.example`, setup all the necessary env variables for `QUEUE CONFIGURATION`

## Setting up Worker:

1. Copy paste the same `.env` file used to setup main server. Change the `PORT` to other available port numbers. Ex: 5566
2. `docker compose up -d`
3. Open [http://localhost:5566](http://localhost:5566)
4. You can bring the worker container down by `docker compose stop`
54 changes: 54 additions & 0 deletions docker/worker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
version: '3.1'

services:
flowise:
image: flowiseai/flowise
restart: always
environment:
- PORT=${PORT}
- CORS_ORIGINS=${CORS_ORIGINS}
- IFRAME_ORIGINS=${IFRAME_ORIGINS}
- FLOWISE_USERNAME=${FLOWISE_USERNAME}
- FLOWISE_PASSWORD=${FLOWISE_PASSWORD}
- FLOWISE_FILE_SIZE_LIMIT=${FLOWISE_FILE_SIZE_LIMIT}
- DEBUG=${DEBUG}
- DATABASE_PATH=${DATABASE_PATH}
- DATABASE_TYPE=${DATABASE_TYPE}
- DATABASE_PORT=${DATABASE_PORT}
- DATABASE_HOST=${DATABASE_HOST}
- DATABASE_NAME=${DATABASE_NAME}
- DATABASE_USER=${DATABASE_USER}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_SSL=${DATABASE_SSL}
- DATABASE_SSL_KEY_BASE64=${DATABASE_SSL_KEY_BASE64}
- APIKEY_STORAGE_TYPE=${APIKEY_STORAGE_TYPE}
- APIKEY_PATH=${APIKEY_PATH}
- SECRETKEY_PATH=${SECRETKEY_PATH}
- FLOWISE_SECRETKEY_OVERWRITE=${FLOWISE_SECRETKEY_OVERWRITE}
- LOG_LEVEL=${LOG_LEVEL}
- LOG_PATH=${LOG_PATH}
- BLOB_STORAGE_PATH=${BLOB_STORAGE_PATH}
- DISABLE_FLOWISE_TELEMETRY=${DISABLE_FLOWISE_TELEMETRY}
- MODEL_LIST_CONFIG_JSON=${MODEL_LIST_CONFIG_JSON}
- GLOBAL_AGENT_HTTP_PROXY=${GLOBAL_AGENT_HTTP_PROXY}
- GLOBAL_AGENT_HTTPS_PROXY=${GLOBAL_AGENT_HTTPS_PROXY}
- GLOBAL_AGENT_NO_PROXY=${GLOBAL_AGENT_NO_PROXY}
- DISABLED_NODES=${DISABLED_NODES}
- MODE=${MODE}
- WORKER_CONCURRENCY=${WORKER_CONCURRENCY}
- QUEUE_NAME=${QUEUE_NAME}
- QUEUE_REDIS_EVENT_STREAM_MAX_LEN=${QUEUE_REDIS_EVENT_STREAM_MAX_LEN}
- REDIS_URL=${REDIS_URL}
- REDIS_HOST=${REDIS_HOST}
- REDIS_PORT=${REDIS_PORT}
- REDIS_PASSWORD=${REDIS_PASSWORD}
- REDIS_USERNAME=${REDIS_USERNAME}
- REDIS_TLS=${REDIS_TLS}
- REDIS_CERT=${REDIS_CERT}
- REDIS_KEY=${REDIS_KEY}
- REDIS_CA=${REDIS_CA}
ports:
- '${PORT}:${PORT}'
volumes:
- ~/.flowise:/root/.flowise
entrypoint: /bin/sh -c "sleep 3; flowise worker"
1 change: 1 addition & 0 deletions packages/server/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ PORT=3000
# QUEUE_NAME=flowise-queue
# QUEUE_REDIS_EVENT_STREAM_MAX_LEN=100000
# WORKER_CONCURRENCY=300
# REDIS_URL=
# REDIS_HOST=localhost
# REDIS_PORT=6379
# REDIS_USERNAME=
Expand Down
32 changes: 18 additions & 14 deletions packages/server/src/CachePool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@ export class CachePool {

constructor() {
if (process.env.MODE === MODE.QUEUE) {
this.redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined
})
if (process.env.REDIS_URL) {
this.redisClient = new Redis(process.env.REDIS_URL)
} else {
this.redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined
})
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions packages/server/src/commands/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export abstract class BaseCommand extends Command {
WORKER_CONCURRENCY: Flags.string(),
QUEUE_NAME: Flags.string(),
QUEUE_REDIS_EVENT_STREAM_MAX_LEN: Flags.string(),
REDIS_URL: Flags.string(),
REDIS_HOST: Flags.string(),
REDIS_PORT: Flags.string(),
REDIS_USERNAME: Flags.string(),
Expand Down Expand Up @@ -176,6 +177,7 @@ export abstract class BaseCommand extends Command {

// Queue
if (flags.MODE) process.env.MODE = flags.MODE
if (flags.REDIS_URL) process.env.REDIS_URL = flags.REDIS_URL
if (flags.REDIS_HOST) process.env.REDIS_HOST = flags.REDIS_HOST
if (flags.REDIS_PORT) process.env.REDIS_PORT = flags.REDIS_PORT
if (flags.REDIS_USERNAME) process.env.REDIS_USERNAME = flags.REDIS_USERNAME
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export default class Worker extends BaseCommand {
logger.info(`Prediction Worker ${this.predictionWorkerId} created`)

const predictionQueueName = predictionQueue.getQueueName()
const queueEvents = new QueueEvents(predictionQueueName)
const queueEvents = new QueueEvents(predictionQueueName, { connection: queueManager.getConnection() })

queueEvents.on<CustomListener>('abort', async ({ id }: { id: string }) => {
abortControllerPool.abort(id)
Expand Down
3 changes: 0 additions & 3 deletions packages/server/src/queue/PredictionQueue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import dotenv from 'dotenv'
import { DataSource } from 'typeorm'
import { executeFlow } from '../utils/buildChatflow'
import { IComponentNodes, IExecuteFlowParams } from '../Interface'
Expand All @@ -9,8 +8,6 @@ import { AbortControllerPool } from '../AbortControllerPool'
import { BaseQueue } from './BaseQueue'
import { RedisOptions } from 'bullmq'

dotenv.config()

interface PredictionQueueOptions {
appDataSource: DataSource
telemetry: Telemetry
Expand Down
25 changes: 14 additions & 11 deletions packages/server/src/queue/QueueManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import dotenv from 'dotenv'
import { BaseQueue } from './BaseQueue'
import { PredictionQueue } from './PredictionQueue'
import { UpsertQueue } from './UpsertQueue'
Expand All @@ -12,8 +11,6 @@ import { createBullBoard } from 'bull-board'
import { BullMQAdapter } from 'bull-board/bullMQAdapter'
import { Express } from 'express'

dotenv.config()

const QUEUE_NAME = process.env.QUEUE_NAME || 'flowise-queue'

type QUEUE_TYPE = 'prediction' | 'upsert'
Expand All @@ -25,19 +22,25 @@ export class QueueManager {
private bullBoardRouter?: Express

private constructor() {
let tlsOpts = undefined
if (process.env.REDIS_URL && process.env.REDIS_URL.startsWith('rediss://')) {
tlsOpts = {
rejectUnauthorized: false
}
} else if (process.env.REDIS_TLS === 'true') {
tlsOpts = {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
}
this.connection = {
url: process.env.REDIS_URL || undefined,
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined
tls: tlsOpts
}
}

Expand Down
19 changes: 18 additions & 1 deletion packages/server/src/queue/RedisEventPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,24 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
private redisPublisher: ReturnType<typeof createClient>

constructor() {
this.redisPublisher = createClient()
if (process.env.REDIS_URL) {
this.redisPublisher = createClient({
url: process.env.REDIS_URL
})
} else {
this.redisPublisher = createClient({
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
socket: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
tls: process.env.REDIS_TLS === 'true',
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
})
}
}

async connect() {
Expand Down
19 changes: 18 additions & 1 deletion packages/server/src/queue/RedisEventSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,24 @@ export class RedisEventSubscriber {
private subscribedChannels: Set<string> = new Set()

constructor(sseStreamer: SSEStreamer) {
this.redisSubscriber = createClient()
if (process.env.REDIS_URL) {
this.redisSubscriber = createClient({
url: process.env.REDIS_URL
})
} else {
this.redisSubscriber = createClient({
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
socket: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
tls: process.env.REDIS_TLS === 'true',
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
})
}
this.sseStreamer = sseStreamer
}

Expand Down
3 changes: 0 additions & 3 deletions packages/server/src/queue/UpsertQueue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import dotenv from 'dotenv'
import { DataSource } from 'typeorm'
import { IComponentNodes, IExecuteDocStoreUpsert, IExecuteFlowParams, IExecuteProcessLoader, IExecuteVectorStoreInsert } from '../Interface'
import { Telemetry } from '../utils/telemetry'
Expand All @@ -9,8 +8,6 @@ import { executeDocStoreUpsert, insertIntoVectorStore, processLoader } from '../
import { RedisOptions } from 'bullmq'
import logger from '../utils/logger'

dotenv.config()

interface UpsertQueueOptions {
appDataSource: DataSource
telemetry: Telemetry
Expand Down
32 changes: 18 additions & 14 deletions packages/server/src/utils/rateLimit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,24 @@ export class RateLimiterManager {

constructor() {
if (process.env.MODE === MODE.QUEUE) {
this.redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined
})
if (process.env.REDIS_URL) {
this.redisClient = new Redis(process.env.REDIS_URL)
} else {
this.redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined
})
}
}
}

Expand Down

0 comments on commit 13d69b7

Please sign in to comment.