This repository has been archived by the owner on Jul 28, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 82
/
worker-manager.js
96 lines (77 loc) · 2.29 KB
/
worker-manager.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
var Worker = require('./worker')
/**
* Tracks worker state across runs.
*/
function WorkerManager () {
this._pollHandle = null
this.workers = {}
this.isPolling = false
this.shouldShutdown = false
}
WorkerManager.prototype.registerWorker = function registerWorker (workerData) {
if (this.workers[workerData.id]) {
this.unregisterWorker(this.workers[workerData.id])
}
var worker = new Worker(workerData)
worker.emit('status', worker.status)
this.workers[workerData.id] = worker
return worker
}
WorkerManager.prototype.unregisterWorker = function unregisterWorker (worker) {
worker.emit('delete', worker)
worker.removeAllListeners()
delete this.workers[worker.id]
return worker
}
WorkerManager.prototype.updateWorker = function updateWorker (workerData) {
var workers = this.workers
if (workers[workerData.id]) {
var worker = workers[workerData.id]
var prevStatus = worker.status
Object.keys(workerData).forEach(function (k) {
worker[k] = workerData[k]
})
if (worker.status !== prevStatus) {
worker.emit('status', worker.status)
}
}
}
WorkerManager.prototype.startPolling = function startPolling (client, pollingTimeout, callback) {
if (this.isPolling || this.shouldShutdown) {
return
}
var self = this
this.isPolling = true
client.getWorkers(function (err, updatedWorkers) {
if (err) {
self.isPolling = false
return (callback ? callback(err) : null)
}
var activeWorkers = (updatedWorkers || []).reduce(function (o, worker) {
o[worker.id] = worker
return o
}, {})
Object.keys(self.workers).forEach(function (workerId) {
if (activeWorkers[workerId]) {
// process updates
self.updateWorker(activeWorkers[workerId])
} else {
// process deletions
self.unregisterWorker(self.workers[workerId])
}
})
self._pollHandle = setTimeout(function () {
self.isPolling = false
self.startPolling(client, pollingTimeout, callback)
}, pollingTimeout)
})
}
WorkerManager.prototype.stopPolling = function stopPolling () {
if (this._pollHandle) {
clearTimeout(this._pollHandle)
this._pollHandle = null
}
this.shouldShutdown = true
}
// expose a single, shared instance of WorkerManager
module.exports = new WorkerManager()