Skip to content

Commit

Permalink
refactor: extract common control code to queueProvider & jobProvider (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
felixmosh authored May 7, 2021
1 parent 715882e commit 086cfe6
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 142 deletions.
41 changes: 29 additions & 12 deletions src/routes/apiRouter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Router } from 'express'
import { ParamsDictionary, RequestHandler } from 'express-serve-static-core'
import { cleanAll } from './handlers/cleanAll'
import { cleanJob } from './handlers/cleanJob'
import { errorHandler } from './handlers/errorHandler'
Expand All @@ -8,18 +7,36 @@ import { promoteJob } from './handlers/promoteJob'
import { queuesHandler } from './handlers/queues'
import { retryAll } from './handlers/retryAll'
import { retryJob } from './handlers/retryJob'

const wrapAsync = <Params extends ParamsDictionary>(
fn: RequestHandler<Params>,
): RequestHandler<Params> => async (req, res, next) =>
Promise.resolve(fn(req, res, next)).catch(next)
import { jobProvider } from './middlewares/jobProvider'
import { queueProvider } from './middlewares/queueProvider'
import { wrapAsync } from './middlewares/wrapAsync'

export const apiRouter = Router()
.get('/queues', wrapAsync(queuesHandler))
.put('/queues/:queueName/retry', wrapAsync(retryAll))
.put('/queues/:queueName/:id/retry', wrapAsync(retryJob))
.put('/queues/:queueName/:id/clean', wrapAsync(cleanJob))
.put('/queues/:queueName/:id/promote', wrapAsync(promoteJob))
.get('/queues/:queueName/:id/logs', wrapAsync(jobLogs))
.put('/queues/:queueName/clean/:queueStatus', wrapAsync(cleanAll))
.put('/queues/:queueName/retry', queueProvider(), wrapAsync(retryAll))
.put(
'/queues/:queueName/:jobId/retry',
[queueProvider(), jobProvider()],
wrapAsync(retryJob),
)
.put(
'/queues/:queueName/:jobId/clean',
[queueProvider(), jobProvider()],
wrapAsync(cleanJob),
)
.put(
'/queues/:queueName/:jobId/promote',
[queueProvider(), jobProvider()],
wrapAsync(promoteJob),
)
.put(
'/queues/:queueName/clean/:queueStatus',
queueProvider(),
wrapAsync(cleanAll),
)
.get(
'/queues/:queueName/:jobId/logs',
[queueProvider({ skipReadOnlyModeCheck: true }), jobProvider()],
wrapAsync(jobLogs),
)
.use(errorHandler)
19 changes: 3 additions & 16 deletions src/routes/handlers/cleanAll.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Request, RequestHandler, Response } from 'express-serve-static-core'
import { BullBoardQueues, JobCleanStatus } from '../../@types/app'
import { JobCleanStatus } from '../../@types/app'

type RequestParams = {
queueName: string
Expand All @@ -10,24 +10,11 @@ export const cleanAll: RequestHandler<RequestParams> = async (
req: Request,
res: Response,
) => {
const { queueName, queueStatus } = req.params
const { bullBoardQueues } = req.app.locals as {
bullBoardQueues: BullBoardQueues
}
const { queueStatus } = req.params
const { queue } = res.locals

const GRACE_TIME_MS = 5000

const queue = bullBoardQueues.get(queueName)
if (!queue) {
return res.status(404).send({
error: 'Queue not found',
})
} else if (queue.readOnlyMode) {
return res.status(405).send({
error: 'Method not allowed on read only queue',
})
}

await queue.clean(queueStatus as any, GRACE_TIME_MS)

return res.sendStatus(200)
Expand Down
33 changes: 6 additions & 27 deletions src/routes/handlers/cleanJob.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,11 @@
import { Request, RequestHandler, Response } from 'express-serve-static-core'
import { BullBoardQueues } from '../../@types/app'
import { QueueJob } from '../../@types/app'

export const cleanJob: RequestHandler = async (req: Request, res: Response) => {
const { bullBoardQueues } = req.app.locals as {
bullBoardQueues: BullBoardQueues
}
const { queueName, id } = req.params
const queue = bullBoardQueues.get(queueName)

if (!queue) {
return res.status(404).send({
error: 'Queue not found',
})
} else if (queue.readOnlyMode) {
return res.status(405).send({
error: 'Method not allowed on read only queue',
})
}

const job = await queue.getJob(id)

if (!job) {
throw new Error('xxxx')

return res.status(404).send({
error: 'Job not found',
})
}
export const cleanJob: RequestHandler = async (
_req: Request,
res: Response,
) => {
const { job } = res.locals as { job: QueueJob }

await job.remove()

Expand Down
24 changes: 3 additions & 21 deletions src/routes/handlers/jobLogs.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,10 @@
import { Request, RequestHandler, Response } from 'express-serve-static-core'
import { BullBoardQueues } from '../../@types/app'

export const jobLogs: RequestHandler = async (req: Request, res: Response) => {
const { bullBoardQueues } = req.app.locals as {
bullBoardQueues: BullBoardQueues
}
const { queueName, id } = req.params
const queue = bullBoardQueues.get(queueName)
const { jobId } = req.params
const { queue } = res.locals

if (!queue) {
return res.status(404).send({
error: 'Queue not found',
})
}

const job = await queue.getJob(id)

if (!job) {
return res.status(404).send({
error: 'Job not found',
})
}

const logs = await queue.getJobLogs(id)
const logs = await queue.getJobLogs(jobId)

return res.json(logs)
}
28 changes: 3 additions & 25 deletions src/routes/handlers/promoteJob.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,11 @@
import { Request, RequestHandler, Response } from 'express-serve-static-core'
import { BullBoardQueues } from '../../@types/app'
import { QueueJob } from '../../@types/app'

export const promoteJob: RequestHandler = async (
req: Request,
_req: Request,
res: Response,
) => {
const { bullBoardQueues } = req.app.locals as {
bullBoardQueues: BullBoardQueues
}
const { queueName, id } = req.params
const queue = bullBoardQueues.get(queueName)

if (!queue) {
return res.status(404).send({
error: 'Queue not found',
})
} else if (queue.readOnlyMode) {
return res.status(405).send({
error: 'Method not allowed on read only queue',
})
}

const job = await queue.getJob(id)

if (!job) {
return res.status(404).send({
error: 'Job not found',
})
}
const { job } = res.locals as { job: QueueJob }

await job.promote()

Expand Down
22 changes: 6 additions & 16 deletions src/routes/handlers/retryAll.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,11 @@
import { Request, RequestHandler, Response } from 'express-serve-static-core'
import { BaseAdapter } from '../../queueAdapters/base'

import { BullBoardQueues } from '../../@types/app'

export const retryAll: RequestHandler = async (req: Request, res: Response) => {
const { queueName } = req.params
const { bullBoardQueues } = req.app.locals as {
bullBoardQueues: BullBoardQueues
}

const queue = bullBoardQueues.get(queueName)
if (!queue) {
return res.status(404).send({ error: 'queue not found' })
} else if (queue.readOnlyMode) {
return res.status(405).send({
error: 'Method not allowed on read only queue',
})
}
export const retryAll: RequestHandler = async (
_req: Request,
res: Response,
) => {
const { queue } = res.locals as { queue: BaseAdapter }

const jobs = await queue.getJobs(['failed'])
await Promise.all(jobs.map((job) => job.retry()))
Expand Down
31 changes: 6 additions & 25 deletions src/routes/handlers/retryJob.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,11 @@
import { Request, RequestHandler, Response } from 'express-serve-static-core'
import { BullBoardQueues } from '../../@types/app'
import { QueueJob } from '../../@types/app'

export const retryJob: RequestHandler = async (req: Request, res: Response) => {
const { bullBoardQueues } = req.app.locals as {
bullBoardQueues: BullBoardQueues
}
const { queueName, id } = req.params
const queue = bullBoardQueues.get(queueName)

if (!queue) {
return res.status(404).send({
error: 'Queue not found',
})
} else if (queue.readOnlyMode) {
return res.status(405).send({
error: 'Method not allowed on read only queue',
})
}

const job = await queue.getJob(id)

if (!job) {
return res.status(404).send({
error: 'Job not found',
})
}
export const retryJob: RequestHandler = async (
_req: Request,
res: Response,
) => {
const { job } = res.locals as { job: QueueJob }

await job.retry()

Expand Down
25 changes: 25 additions & 0 deletions src/routes/middlewares/jobProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { NextFunction, Request, Response } from 'express-serve-static-core'
import { BaseAdapter } from '../../queueAdapters/base'

export function jobProvider() {
return async (req: Request, res: Response, next: NextFunction) => {
const { jobId } = req.params
const { queue } = res.locals as { queue: BaseAdapter }

if (!jobId || !queue) {
return next(new Error('Invalid data'))
}

const job = await queue.getJob(jobId)

if (!job) {
return res.status(404).send({
error: 'Job not found',
})
}

res.locals.job = job

next()
}
}
33 changes: 33 additions & 0 deletions src/routes/middlewares/queueProvider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { NextFunction, Request, Response } from 'express-serve-static-core'
import { BullBoardQueues } from '../../@types/app'

export function queueProvider({
skipReadOnlyModeCheck = false,
}: {
skipReadOnlyModeCheck?: boolean
} = {}) {
return async (req: Request, res: Response, next: NextFunction) => {
const { queueName } = req.params

if (typeof queueName === 'undefined') {
return next()
}

const { bullBoardQueues } = req.app.locals as {
bullBoardQueues: BullBoardQueues
}

const queue = bullBoardQueues.get(queueName)
if (!queue) {
return res.status(404).send({ error: 'Queue not found' })
} else if (queue.readOnlyMode && !skipReadOnlyModeCheck) {
return res.status(405).send({
error: 'Method not allowed on read only queue',
})
}

res.locals.queue = queue

next()
}
}
6 changes: 6 additions & 0 deletions src/routes/middlewares/wrapAsync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { ParamsDictionary, RequestHandler } from 'express-serve-static-core'

export const wrapAsync = <Params extends ParamsDictionary>(
fn: RequestHandler<Params>,
): RequestHandler<Params> => async (req, res, next) =>
Promise.resolve(fn(req, res, next)).catch(next)

0 comments on commit 086cfe6

Please sign in to comment.