-
Notifications
You must be signed in to change notification settings - Fork 10.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: Migrate custom store and queue to TypeScript #25389
Changes from 4 commits
3d0d3e4
05809dc
2d53ce8
a943c62
94025b2
4551a52
2e6baf0
ef7dfc8
6158818
56bd7e0
07c4f17
7b8564d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,32 +1,40 @@ | ||
function MemoryStoreWithPriorityBuckets() { | ||
import { Store } from "better-queue" | ||
|
||
// getRunningTasks is an extension to the interface, and is used in the tests | ||
interface IGatsbyBetterStore<T> extends Store<T> { | ||
pvdz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
getRunningTasks(cb: (error: any, runningTasks: any) => void): void | ||
} | ||
|
||
export function MemoryStoreWithPriorityBuckets<T>(): IGatsbyBetterStore<T> { | ||
chooban marked this conversation as resolved.
Show resolved
Hide resolved
|
||
type RunningTasks = Record<string, T> | ||
let uuid = 0 | ||
|
||
/** | ||
* Task ids grouped by priority | ||
*/ | ||
const queueMap = new Map() | ||
const queueMap = new Map<number, string[]>() | ||
|
||
/** | ||
* Task id to task lookup | ||
*/ | ||
const tasks = new Map() | ||
const tasks = new Map<string, T>() | ||
|
||
/** | ||
* Task id to priority lookup | ||
*/ | ||
const taskIdToPriority = new Map() | ||
const taskIdToPriority = new Map<string, number>() | ||
|
||
/** | ||
* Lock to running tasks object | ||
*/ | ||
const running = {} | ||
const running: Record<string, RunningTasks> = {} | ||
|
||
let priorityKeys = [] | ||
const updatePriorityKeys = () => { | ||
let priorityKeys: number[] = [] | ||
const updatePriorityKeys = (): void => { | ||
priorityKeys = Array.from(queueMap.keys()).sort((a, b) => b - a) | ||
} | ||
|
||
const addTaskWithPriority = (taskId, priority) => { | ||
const addTaskWithPriority = (taskId: string, priority: number): boolean => { | ||
let needToUpdatePriorityKeys = false | ||
let priorityTasks = queueMap.get(priority) | ||
if (!priorityTasks) { | ||
|
@@ -41,36 +49,39 @@ function MemoryStoreWithPriorityBuckets() { | |
} | ||
|
||
return { | ||
connect: function (cb) { | ||
connect: function (cb): void { | ||
cb(null, tasks.size) | ||
}, | ||
getTask: function (taskId, cb) { | ||
getTask: function (taskId, cb): void { | ||
// @ts-ignore | ||
cb(null, tasks.get(taskId)) | ||
}, | ||
deleteTask: function (taskId, cb) { | ||
deleteTask: function (taskId, cb): void { | ||
if (tasks.get(taskId)) { | ||
tasks.delete(taskId) | ||
const priority = taskIdToPriority.get(taskId) | ||
const priorityTasks = queueMap.get(priority) | ||
priorityTasks.splice(priorityTasks.indexOf(taskId), 1) | ||
taskIdToPriority.delete(taskId) | ||
if (priority) { | ||
const priorityTasks = queueMap.get(priority) || [] | ||
chooban marked this conversation as resolved.
Show resolved
Hide resolved
|
||
priorityTasks.splice(priorityTasks.indexOf(taskId), 1) | ||
taskIdToPriority.delete(taskId) | ||
} | ||
} | ||
cb() | ||
}, | ||
putTask: function (taskId, task, priority = 0, cb) { | ||
putTask: function (taskId, task, priority = 0, cb): void { | ||
const oldTask = tasks.get(taskId) | ||
tasks.set(taskId, task) | ||
let needToUpdatePriorityKeys = false | ||
if (oldTask) { | ||
const oldPriority = taskIdToPriority.get(taskId) | ||
|
||
if (oldPriority !== priority) { | ||
const oldPriorityTasks = queueMap.get(oldPriority) | ||
if (oldPriority && oldPriority !== priority) { | ||
const oldPriorityTasks = queueMap.get(oldPriority) || [] | ||
chooban marked this conversation as resolved.
Show resolved
Hide resolved
|
||
oldPriorityTasks.splice(oldPriorityTasks.indexOf(taskId), 1) | ||
|
||
if ( | ||
addTaskWithPriority(taskId, priority) || | ||
oldPriority.length === 0 | ||
oldPriorityTasks.length === 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was this a bug? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I believe so. The I think what it's saying is if calling code attempts to add the same task twice at different priorities, and removing the older task results in an empty set of tasks for the original priority, then the priority keys need updating. I can try and write a test to cover that. |
||
) { | ||
needToUpdatePriorityKeys = true | ||
} | ||
|
@@ -82,29 +93,33 @@ function MemoryStoreWithPriorityBuckets() { | |
if (needToUpdatePriorityKeys) { | ||
updatePriorityKeys() | ||
} | ||
cb() | ||
cb(null) | ||
}, | ||
takeFirstN: function (n, cb) { | ||
const lockId = uuid++ | ||
takeFirstN: function (n, cb): void { | ||
const lockId = `` + uuid++ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I think I would prefer |
||
let remainingTasks = n | ||
let needToUpdatePriorityKeys = false | ||
let haveSomeTasks = false | ||
const tasksToRun = {} | ||
const tasksToRun: RunningTasks = {} | ||
|
||
for (const priority of priorityKeys) { | ||
const taskWithSamePriority = queueMap.get(priority) | ||
const grabbedTaskIds = taskWithSamePriority.splice(0, remainingTasks) | ||
const tasksWithSamePriority = queueMap.get(priority) | ||
const grabbedTaskIds = | ||
tasksWithSamePriority?.splice(0, remainingTasks) ?? [] | ||
grabbedTaskIds.forEach(taskId => { | ||
// add task to task that will run | ||
// and remove it from waiting list | ||
tasksToRun[taskId] = tasks.get(taskId) | ||
tasks.delete(taskId) | ||
taskIdToPriority.delete(taskId) | ||
haveSomeTasks = true | ||
const task = tasks.get(taskId) | ||
if (task) { | ||
tasksToRun[taskId] = task | ||
tasks.delete(taskId) | ||
taskIdToPriority.delete(taskId) | ||
haveSomeTasks = true | ||
} | ||
}) | ||
|
||
remainingTasks -= grabbedTaskIds.length | ||
if (taskWithSamePriority.length === 0) { | ||
if (tasksWithSamePriority?.length === 0) { | ||
queueMap.delete(priority) | ||
needToUpdatePriorityKeys = true | ||
} | ||
|
@@ -123,26 +138,26 @@ function MemoryStoreWithPriorityBuckets() { | |
|
||
cb(null, lockId) | ||
}, | ||
takeLastN: function (n, cb) { | ||
takeLastN: function (n, cb): void { | ||
// This is not really used by Gatsby, but will be implemented for | ||
// completion in easiest possible way (so not very performant). | ||
// Mostly done so generic test suite used by other stores passes. | ||
// This is mostly C&P from takeFirstN, with array reversal and different | ||
// splice args | ||
const lockId = uuid++ | ||
const lockId = `` + uuid++ | ||
chooban marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let remainingTasks = n | ||
let needToUpdatePriorityKeys = false | ||
let haveSomeTasks = false | ||
const tasksToRun = {} | ||
|
||
for (const priority of priorityKeys.reverse()) { | ||
const taskWithSamePriority = queueMap.get(priority) | ||
const tasksWithSamePriority = queueMap.get(priority) || [] | ||
chooban marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const deleteCount = Math.min( | ||
remainingTasks, | ||
taskWithSamePriority.length | ||
tasksWithSamePriority.length | ||
) | ||
const grabbedTaskIds = taskWithSamePriority.splice( | ||
taskWithSamePriority.length - deleteCount, | ||
const grabbedTaskIds = tasksWithSamePriority.splice( | ||
tasksWithSamePriority.length - deleteCount, | ||
deleteCount | ||
) | ||
grabbedTaskIds.forEach(taskId => { | ||
|
@@ -155,7 +170,7 @@ function MemoryStoreWithPriorityBuckets() { | |
}) | ||
|
||
remainingTasks -= grabbedTaskIds.length | ||
if (taskWithSamePriority.length === 0) { | ||
if (tasksWithSamePriority.length === 0) { | ||
queueMap.delete(priority) | ||
needToUpdatePriorityKeys = true | ||
} | ||
|
@@ -174,17 +189,15 @@ function MemoryStoreWithPriorityBuckets() { | |
|
||
cb(null, lockId) | ||
}, | ||
getRunningTasks: function (cb) { | ||
getRunningTasks: function (cb): void { | ||
cb(null, running) | ||
}, | ||
getLock: function (lockId, cb) { | ||
getLock: function (lockId, cb): void { | ||
cb(null, running[lockId]) | ||
}, | ||
releaseLock: function (lockId, cb) { | ||
releaseLock: function (lockId, cb): void { | ||
delete running[lockId] | ||
cb() | ||
cb(null) | ||
}, | ||
} | ||
} | ||
|
||
module.exports = MemoryStoreWithPriorityBuckets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: you're folding up the declaration with the initialization above (correctly, I think), why not here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just missed it, I'm afraid. Pushing up a commit for it now.