Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Migrate custom store and queue to TypeScript #25389

Merged
merged 12 commits into from
Jul 13, 2020
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"@babel/runtime": "^7.10.3",
"@lerna/prompt": "3.18.5",
"@types/babel__code-frame": "^7.0.1",
"@types/better-queue": "^3.8.2",
"@types/bluebird": "^3.5.30",
"@types/cache-manager": "^2.10.2",
"@types/common-tags": "^1.8.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const MemoryStoreWithPriorityBuckets = require(`../better-queue-custom-store`)
const pify = require(`pify`)
import { memoryStoreWithPriorityBuckets } from "../better-queue-custom-store"
import pify from "pify"

// those are tests copied from https://github.com/diamondio/better-queue-store-test/blob/master/tester.js
// and converted from mocha to jest + used pify to make it nicer to read than callback chain
Expand All @@ -18,7 +18,7 @@ describe(`Custom better-queue memory store`, () => {
`releaseLock`,
]
beforeEach(() => {
store = MemoryStoreWithPriorityBuckets()
store = memoryStoreWithPriorityBuckets()
functions.forEach(fnName => {
if (store[fnName]) {
store[fnName] = pify(store[fnName])
Expand Down Expand Up @@ -51,10 +51,8 @@ describe(`Custom better-queue memory store`, () => {
await store.putTask(`task2`, { value: `secret 2` }, 1)
await store.putTask(`task3`, { value: `secret 3` }, 1)

let lockId, tasks

lockId = await store.takeLastN(2)
tasks = await store.getLock(lockId)
let lockId: string = await store.takeLastN(2)
let tasks: any = await store.getLock(lockId)

// should get the third task
expect(tasks.task3.value).toBe(`secret 3`)
Expand All @@ -80,10 +78,8 @@ describe(`Custom better-queue memory store`, () => {
await store.putTask(`task2`, { value: `secret 2` }, 1)
await store.putTask(`task3`, { value: `secret 3` }, 1)

let lockId, tasks

lockId = await store.takeFirstN(2)
tasks = await store.getLock(lockId)
let lockId = await store.takeFirstN(2)
let tasks = await store.getLock(lockId)

// should get the first task
expect(tasks.task1.value).toBe(`secret 1`)
Expand All @@ -109,12 +105,10 @@ describe(`Custom better-queue memory store`, () => {
await store.putTask(`task2`, { value: `secret 2` }, 1)
await store.putTask(`task3`, { value: `secret 3` }, 1)

const lock1 = await store.takeFirstN(1)
const lock2 = await store.takeLastN(1)

let workers
const lock1: string = await store.takeFirstN(1)
const lock2: string = await store.takeLastN(1)

workers = await store.getRunningTasks()
let workers = await store.getRunningTasks()

// should have first lock
expect(workers[lock1]).toBeDefined()
Expand Down Expand Up @@ -151,7 +145,7 @@ describe(`Custom better-queue memory store`, () => {
await store.deleteTask(`task2`)

// take 2
const lockId = await store.takeFirstN(2)
const lockId: string = await store.takeFirstN(2)
const tasks = await store.getLock(lockId)

// should get the first task
Expand All @@ -173,9 +167,8 @@ describe(`Custom better-queue memory store`, () => {
await store.putTask(`task4`, { value: `secret 4` }, 2)

// take first 2
let lockId, tasks
lockId = await store.takeFirstN(2)
tasks = await store.getLock(lockId)
let lockId: string = await store.takeFirstN(2)
let tasks = await store.getLock(lockId)

// should get the third task
expect(tasks.task3.value).toBe(`secret 3`)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
function MemoryStoreWithPriorityBuckets() {
import { Store } from "better-queue"

export function memoryStoreWithPriorityBuckets<T>(): Store<T> {
type RunningTasks = Record<string, T>
let uuid = 0

/**
* Task ids grouped by priority
*/
const queueMap = new Map()
const queueMap = new Map<number, Array<string>>()

/**
* Task id to task lookup
*/
const tasks = new Map()
const tasks = new Map<string, T>()

/**
* Task id to priority lookup
*/
const taskIdToPriority = new Map()
const taskIdToPriority = new Map<string, number>()

/**
* Lock to running tasks object
*/
const running = {}
const running: Record<string, RunningTasks> = {}

let priorityKeys = []
const updatePriorityKeys = () => {
let priorityKeys: Array<number> = []
const updatePriorityKeys = (): void => {
priorityKeys = Array.from(queueMap.keys()).sort((a, b) => b - a)
}

const addTaskWithPriority = (taskId, priority) => {
const addTaskWithPriority = (taskId: string, priority: number): boolean => {
let needToUpdatePriorityKeys = false
let priorityTasks = queueMap.get(priority)
if (!priorityTasks) {
Expand All @@ -41,36 +44,39 @@ function MemoryStoreWithPriorityBuckets() {
}

return {
connect: function (cb) {
connect: function (cb): void {
cb(null, tasks.size)
},
getTask: function (taskId, cb) {
getTask: function (taskId, cb): void {
// @ts-ignore
cb(null, tasks.get(taskId))
},
deleteTask: function (taskId, cb) {
deleteTask: function (taskId, cb): void {
if (tasks.get(taskId)) {
tasks.delete(taskId)
const priority = taskIdToPriority.get(taskId)
const priorityTasks = queueMap.get(priority)
priorityTasks.splice(priorityTasks.indexOf(taskId), 1)
taskIdToPriority.delete(taskId)
if (priority) {
const priorityTasks = queueMap.get(priority) ?? []
priorityTasks.splice(priorityTasks.indexOf(taskId), 1)
taskIdToPriority.delete(taskId)
}
}
cb()
},
putTask: function (taskId, task, priority = 0, cb) {
putTask: function (taskId, task, priority = 0, cb): void {
const oldTask = tasks.get(taskId)
tasks.set(taskId, task)
let needToUpdatePriorityKeys = false
if (oldTask) {
const oldPriority = taskIdToPriority.get(taskId)

if (oldPriority !== priority) {
const oldPriorityTasks = queueMap.get(oldPriority)
if (oldPriority && oldPriority !== priority) {
const oldPriorityTasks = queueMap.get(oldPriority) ?? []
oldPriorityTasks.splice(oldPriorityTasks.indexOf(taskId), 1)

if (
addTaskWithPriority(taskId, priority) ||
oldPriority.length === 0
addTaskWithPriority(taskId, priority) // ||
// oldPriorityTasks.length === 0
) {
needToUpdatePriorityKeys = true
}
Expand All @@ -82,29 +88,33 @@ function MemoryStoreWithPriorityBuckets() {
if (needToUpdatePriorityKeys) {
updatePriorityKeys()
}
cb()
cb(null)
},
takeFirstN: function (n, cb) {
const lockId = uuid++
takeFirstN: function (n, cb): void {
const lockId = String(uuid++)
let remainingTasks = n
let needToUpdatePriorityKeys = false
let haveSomeTasks = false
const tasksToRun = {}
const tasksToRun: RunningTasks = {}

for (const priority of priorityKeys) {
const taskWithSamePriority = queueMap.get(priority)
const grabbedTaskIds = taskWithSamePriority.splice(0, remainingTasks)
const tasksWithSamePriority = queueMap.get(priority)
const grabbedTaskIds =
tasksWithSamePriority?.splice(0, remainingTasks) ?? []
grabbedTaskIds.forEach(taskId => {
// add task to task that will run
// and remove it from waiting list
tasksToRun[taskId] = tasks.get(taskId)
tasks.delete(taskId)
taskIdToPriority.delete(taskId)
haveSomeTasks = true
const task = tasks.get(taskId)
if (task) {
tasksToRun[taskId] = task
tasks.delete(taskId)
taskIdToPriority.delete(taskId)
haveSomeTasks = true
}
})

remainingTasks -= grabbedTaskIds.length
if (taskWithSamePriority.length === 0) {
if (tasksWithSamePriority?.length === 0) {
queueMap.delete(priority)
needToUpdatePriorityKeys = true
}
Expand All @@ -123,26 +133,26 @@ function MemoryStoreWithPriorityBuckets() {

cb(null, lockId)
},
takeLastN: function (n, cb) {
takeLastN: function (n, cb): void {
// This is not really used by Gatsby, but will be implemented for
// completion in easiest possible way (so not very performant).
// Mostly done so generic test suite used by other stores passes.
// This is mostly C&P from takeFirstN, with array reversal and different
// splice args
const lockId = uuid++
const lockId = String(uuid++)
let remainingTasks = n
let needToUpdatePriorityKeys = false
let haveSomeTasks = false
const tasksToRun = {}

for (const priority of priorityKeys.reverse()) {
const taskWithSamePriority = queueMap.get(priority)
const tasksWithSamePriority = queueMap.get(priority) ?? []
const deleteCount = Math.min(
remainingTasks,
taskWithSamePriority.length
tasksWithSamePriority.length
)
const grabbedTaskIds = taskWithSamePriority.splice(
taskWithSamePriority.length - deleteCount,
const grabbedTaskIds = tasksWithSamePriority.splice(
tasksWithSamePriority.length - deleteCount,
deleteCount
)
grabbedTaskIds.forEach(taskId => {
Expand All @@ -155,7 +165,7 @@ function MemoryStoreWithPriorityBuckets() {
})

remainingTasks -= grabbedTaskIds.length
if (taskWithSamePriority.length === 0) {
if (tasksWithSamePriority.length === 0) {
queueMap.delete(priority)
needToUpdatePriorityKeys = true
}
Expand All @@ -174,17 +184,19 @@ function MemoryStoreWithPriorityBuckets() {

cb(null, lockId)
},
getRunningTasks: function (cb) {
// @ts-ignore
// getRunningTasks is an extension to the interface, and is only used in the tests
getRunningTasks: function (
cb: (err?: any, value?: Record<string, RunningTasks>) => void
): void {
cb(null, running)
},
getLock: function (lockId, cb) {
getLock: function (lockId, cb): void {
cb(null, running[lockId])
},
releaseLock: function (lockId, cb) {
releaseLock: function (lockId, cb): void {
delete running[lockId]
cb()
cb(null)
},
}
}

module.exports = MemoryStoreWithPriorityBuckets
13 changes: 6 additions & 7 deletions packages/gatsby/src/query/graphql-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import GraphQLSpanTracer from "./graphql-span-tracer"

type Query = string | Source

export interface IGraphQLRunnerOptions {
collectStats?: boolean
graphqlTracing?: boolean
}

export class GraphQLRunner {
parseCache: Map<Query, DocumentNode>

Expand All @@ -38,13 +43,7 @@ export class GraphQLRunner {

constructor(
protected store: Store<IGatsbyState>,
{
collectStats,
graphqlTracing,
}: {
collectStats?: boolean
graphqlTracing?: boolean
} = {}
{ collectStats, graphqlTracing }: IGraphQLRunnerOptions = {}
) {
const { schema, schemaCustomization } = this.store.getState()

Expand Down
Loading