diff --git a/packages/embedder/EmbeddingsJobQueueStream.ts b/packages/embedder/EmbeddingsJobQueueStream.ts index cb995821f4b..49c3e4c9e58 100644 --- a/packages/embedder/EmbeddingsJobQueueStream.ts +++ b/packages/embedder/EmbeddingsJobQueueStream.ts @@ -41,16 +41,20 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator { .returningAll() .executeTakeFirst() } - const job = (await getJob(false)) || (await getJob(true)) - if (!job) { - Logger.log('JobQueueStream: no jobs found') - // queue is empty, so sleep for a while - await sleep(ms('10s')) + try { + const job = (await getJob(false)) || (await getJob(true)) + if (!job) { + Logger.log('JobQueueStream: no jobs found') + // queue is empty, so sleep for a while + await sleep(ms('10s')) + return this.next() + } + await this.orchestrator.runStep(job) + return {done: false, value: job} + } catch (e) { + await sleep(1000) return this.next() } - - await this.orchestrator.runStep(job) - return {done: false, value: job} } return() { return Promise.resolve({done: true as const, value: undefined}) diff --git a/packages/server/postgres/getKysely.ts b/packages/server/postgres/getKysely.ts index f2824a800d0..0a3256081d3 100644 --- a/packages/server/postgres/getKysely.ts +++ b/packages/server/postgres/getKysely.ts @@ -3,21 +3,26 @@ import getPg from './getPg' import {DB} from './pg.d' let kysely: Kysely | undefined + +const makeKysely = () => { + const nextPg = getPg() + nextPg.on('poolChange' as any, makeKysely) + return new Kysely({ + dialect: new PostgresDialect({ + pool: nextPg + }) + // ,log(event) { + // if (event.level === 'query') { + // console.log(event.query.sql) + // console.log(event.query.parameters) + // } + // } + }) +} + const getKysely = () => { if (!kysely) { - const pg = getPg() - kysely = new Kysely({ - dialect: new PostgresDialect({ - pool: pg - }) - // query logging, if you'd like it: - // log(event) { - // if (event.level === 'query') { - // console.log(event.query.sql) - // console.log(event.query.parameters) - // } - // } - }) + kysely = makeKysely() } return kysely } diff --git a/packages/server/postgres/getPg.ts b/packages/server/postgres/getPg.ts index f9d474c9026..2edc5ff4729 100644 --- a/packages/server/postgres/getPg.ts +++ b/packages/server/postgres/getPg.ts @@ -1,12 +1,31 @@ import {Pool} from 'pg' +import sleep from '../../client/utils/sleep' import getPgConfig from './getPgConfig' const config = getPgConfig() +const graceFullyReconnect = async () => { + for (let i = 0; i < 1e6; i++) { + const nextPool = new Pool(getPgConfig()) + try { + const testClient = await nextPool.connect() + testClient.release() + nextPool.on('error', graceFullyReconnect) + const oldPool = pool + pool = nextPool + oldPool?.emit('changePool') + return + } catch (e) { + await sleep(1000) + } + } +} + let pool: Pool | undefined const getPg = () => { if (!pool) { pool = new Pool(config) + pool.on('error', graceFullyReconnect) } return pool }